Coverage for src/prepare_times_nz/stage_3/demand_projections/agriculture.py: 90%

144 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 23:05 +0000

1""" 

2This module converts Agri, forest, fish input demand projection assumptions 

3 and compiles indices for all commodities and scenarios 

4 

5A separate stage 4 module can extract these 

6 into the relevant demand scenario files 

7 

8""" 

9 

10from pathlib import Path 

11 

12import pandas as pd 

13from prepare_times_nz.stage_0.stage_0_settings import BASE_YEAR 

14from prepare_times_nz.utilities.data_in_out import _save_data 

15from prepare_times_nz.utilities.filepaths import ( 

16 ASSUMPTIONS, 

17 EXTERNAL_DATA, 

18 STAGE_2_DATA, 

19 STAGE_3_DATA, 

20) 

21 

22# CONSTANTS 

23# file paths 

24PROJECTIONS_ASSUMPTIONS = ASSUMPTIONS / "demand_projections" 

25OUTPUT = STAGE_3_DATA / "demand_projections" 

26OUTPUT_CHECKS = OUTPUT / "checks" 

27# end year setting 

28END_YEAR = 2050 

29ASSUMPTIONS_FILE = PROJECTIONS_ASSUMPTIONS / "agriculture_demand_projections.csv" 

30 

31 

32# HELPERS 

33def save_agr_proj_data(df, name, label): 

34 """save data wrapper""" 

35 label = "Saving ag, forest, and fish demand projections (" + label + ")" 

36 _save_data(df, name, label, filepath=OUTPUT) 

37 

38 

39def save_agr_proj_check(df, name, label): 

40 """save checking data wrapper""" 

41 label = "Saving ag, forest, and fish demand checks (" + label + ")" 

42 _save_data(df, name, label, filepath=OUTPUT_CHECKS) 

43 

44 

45def normalise_optional_text(value): 

46 """Return stripped text or None for blanks/NaN-like values.""" 

47 if pd.isna(value): 

48 return None 

49 text = str(value).strip() 

50 return text or None 

51 

52 

53def load_agriculture_projection_assumptions(assumptions_path=ASSUMPTIONS_FILE): 

54 """Load agriculture projection mappings and validate expected columns.""" 

55 df = pd.read_csv(assumptions_path).copy() 

56 df = df[df["SectorGroup"] == "Agriculture, Forestry and Fishing"].copy() 

57 

58 required_cols = { 

59 "SectorGroup", 

60 "Sector", 

61 "Scenario", 

62 "Method", 

63 "Workbook", 

64 "SheetName", 

65 "SourceCategory1", 

66 "SourceCategory2", 

67 "ConstantIndex", 

68 "Note", 

69 } 

70 missing_cols = required_cols.difference(df.columns) 

71 if missing_cols: 

72 raise ValueError( 

73 "Agriculture demand projection assumptions are missing columns: " 

74 + ", ".join(sorted(missing_cols)) 

75 ) 

76 

77 df["Scenario"] = df["Scenario"].astype(str).str.strip().str.title() 

78 df["Method"] = df["Method"].astype(str).str.strip().str.title() 

79 df["Workbook"] = df["Workbook"].apply(normalise_optional_text) 

80 df["SheetName"] = df["SheetName"].apply(normalise_optional_text) 

81 df["SourceCategory1"] = df["SourceCategory1"].apply(normalise_optional_text) 

82 df["SourceCategory2"] = df["SourceCategory2"].apply(normalise_optional_text) 

83 df["ConstantIndex"] = pd.to_numeric(df["ConstantIndex"], errors="coerce") 

84 

85 duplicates = df.duplicated(["SectorGroup", "Sector", "Scenario"], keep=False) 

86 if duplicates.any(): 

87 duplicate_rows = df.loc[duplicates, ["SectorGroup", "Sector", "Scenario"]] 

88 raise ValueError( 

89 "Duplicate agriculture demand projection mappings found:\n" 

90 + duplicate_rows.to_string(index=False) 

91 ) 

92 

93 return df 

94 

95 

96def load_workbook_projection_sheet(workbook_path, sheet_name): 

97 """ 

98 Load one ERP2 projection sheet. 

99 

100 The workbook stores years as columns, with the first two columns acting as 

101 row identifiers for the projection categories. 

102 """ 

103 df = pd.read_excel(workbook_path, sheet_name=sheet_name, header=5) 

104 first_two_cols = list(df.columns[:2]) 

105 df = df.rename( 

106 columns={ 

107 first_two_cols[0]: "SourceCategory1", 

108 first_two_cols[1]: "SourceCategory2", 

109 } 

110 ) 

111 

112 rename_map = {} 

113 for col in df.columns: 

114 if isinstance(col, float) and col.is_integer(): 

115 rename_map[col] = int(col) 

116 df = df.rename(columns=rename_map) 

117 

118 year_cols = [ 

119 col 

120 for col in df.columns 

121 if isinstance(col, int) and BASE_YEAR <= col <= END_YEAR 

122 ] 

123 if not year_cols: 

124 raise ValueError( 

125 f"No year columns between {BASE_YEAR} and {END_YEAR} found in " 

126 f"{workbook_path} [{sheet_name}]" 

127 ) 

128 

129 return df[["SourceCategory1", "SourceCategory2"] + year_cols].copy() 

130 

131 

132def get_cached_workbook_projection_sheet(mapping_row, sheet_cache, external_data_dir): 

133 """Return a cached ERP2 worksheet dataframe for the configured mapping row.""" 

134 workbook_path = Path(external_data_dir) / mapping_row["Workbook"] 

135 cache_key = (str(workbook_path), mapping_row["SheetName"]) 

136 if cache_key not in sheet_cache: 

137 sheet_cache[cache_key] = load_workbook_projection_sheet( 

138 workbook_path, mapping_row["SheetName"] 

139 ) 

140 return sheet_cache[cache_key].copy() 

141 

142 

143def select_projection_match(df, mapping_row): 

144 """Select the single workbook row that matches the configured categories.""" 

145 mask = pd.Series(True, index=df.index) 

146 for column in ["SourceCategory1", "SourceCategory2"]: 

147 value = mapping_row[column] 

148 if value is not None: 

149 mask &= df[column].astype(str).str.strip().eq(value) 

150 

151 matches = df.loc[mask].copy() 

152 mapping_details = ( 

153 f"{mapping_row['Sector']} / {mapping_row['Scenario']} " 

154 f"using [{mapping_row['SourceCategory1']!r}, {mapping_row['SourceCategory2']!r}] " 

155 f"in {mapping_row['Workbook']} [{mapping_row['SheetName']}]" 

156 ) 

157 if matches.empty: 

158 raise ValueError( 

159 "No ERP2 workbook row matched agriculture projection mapping for " 

160 + mapping_details 

161 ) 

162 if len(matches) > 1: 

163 raise ValueError( 

164 "Multiple ERP2 workbook rows matched agriculture projection mapping for " 

165 + mapping_details 

166 ) 

167 return matches.iloc[0] 

168 

169 

170def build_projection_index_frame(match_row, mapping_row): 

171 """Convert a matched workbook row into a base-year index dataframe.""" 

172 year_cols = [col for col in match_row.index if isinstance(col, int)] 

173 values = pd.to_numeric(match_row[year_cols], errors="coerce") 

174 if pd.isna(values.get(BASE_YEAR)) or values[BASE_YEAR] == 0: 

175 raise ValueError( 

176 "Cannot compute agriculture projection index because the base-year value " 

177 f"is missing or zero for {mapping_row['Sector']} / {mapping_row['Scenario']}" 

178 ) 

179 

180 out = pd.DataFrame({"Year": year_cols, "Index": values.values / values[BASE_YEAR]}) 

181 out["SectorGroup"] = mapping_row["SectorGroup"] 

182 out["Sector"] = mapping_row["Sector"] 

183 out["Scenario"] = mapping_row["Scenario"] 

184 return out[["SectorGroup", "Sector", "Scenario", "Year", "Index"]] 

185 

186 

187def get_workbook_projection_indices( 

188 mapping_row, sheet_cache, external_data_dir=EXTERNAL_DATA 

189): 

190 """Read one configured ERP2 row and convert it to a base-year index series.""" 

191 df = get_cached_workbook_projection_sheet( 

192 mapping_row, sheet_cache, external_data_dir 

193 ) 

194 match_row = select_projection_match(df, mapping_row) 

195 return build_projection_index_frame(match_row, mapping_row) 

196 

197 

198def get_constant_projection_indices(mapping_row): 

199 """Create a flat constant projection series from configuration.""" 

200 constant_index = mapping_row["ConstantIndex"] 

201 if pd.isna(constant_index): 

202 raise ValueError( 

203 f"ConstantIndex is required for constant agriculture mapping: {mapping_row['Sector']}" 

204 ) 

205 

206 out = pd.DataFrame({"Year": range(BASE_YEAR, END_YEAR + 1)}) 

207 out["Index"] = float(constant_index) 

208 out["SectorGroup"] = mapping_row["SectorGroup"] 

209 out["Sector"] = mapping_row["Sector"] 

210 out["Scenario"] = mapping_row["Scenario"] 

211 

212 return out[["SectorGroup", "Sector", "Scenario", "Year", "Index"]] 

213 

214 

215def expand_projection_indices(df): 

216 """Expand projection rows to a full annual series and interpolate missing years.""" 

217 group_vars = ["SectorGroup", "Sector", "Scenario"] 

218 years = pd.DataFrame({"Year": range(BASE_YEAR, END_YEAR + 1)}) 

219 groups = df[group_vars].drop_duplicates() 

220 

221 full = groups.merge(years, how="cross") 

222 full = full.merge(df, on=group_vars + ["Year"], how="left") 

223 full = full.sort_values(group_vars + ["Year"]) 

224 full["Index"] = full.groupby(group_vars, group_keys=False)["Index"].apply( 

225 lambda s: s.interpolate(method="linear").ffill().bfill() 

226 ) 

227 

228 return full[group_vars + ["Year", "Index"]] 

229 

230 

231# FUNCTIONS 

232def get_agriculture_growth_indices( 

233 assumptions_path=ASSUMPTIONS_FILE, external_data_dir=EXTERNAL_DATA 

234): 

235 """ 

236 Build agriculture demand indices from a mapping config. 

237 

238 Each row in the assumptions CSV maps a TIMES-NZ sector/scenario either to: 

239 - a workbook row in the ERP2 detailed results workbook, or 

240 - a constant index for sectors that remain flat 

241 """ 

242 mappings = load_agriculture_projection_assumptions(assumptions_path) 

243 sheet_cache = {} 

244 projection_parts = [] 

245 

246 for _, mapping_row in mappings.iterrows(): 

247 method = mapping_row["Method"] 

248 if method == "Workbook": 

249 projection_parts.append( 

250 get_workbook_projection_indices( 

251 mapping_row, 

252 sheet_cache=sheet_cache, 

253 external_data_dir=external_data_dir, 

254 ) 

255 ) 

256 elif method == "Constant": 

257 projection_parts.append(get_constant_projection_indices(mapping_row)) 

258 else: 

259 raise ValueError( 

260 f"Unsupported agriculture demand projection method: {method}" 

261 ) 

262 

263 combined = pd.concat(projection_parts, ignore_index=True) 

264 duplicates = combined.duplicated( 

265 ["SectorGroup", "Sector", "Scenario", "Year"], keep=False 

266 ) 

267 if duplicates.any(): 

268 duplicate_rows = combined.loc[ 

269 duplicates, ["SectorGroup", "Sector", "Scenario", "Year"] 

270 ] 

271 raise ValueError( 

272 "Duplicate agriculture demand projection rows found after compilation:\n" 

273 + duplicate_rows.to_string(index=False) 

274 ) 

275 

276 return expand_projection_indices(combined) 

277 

278 

279def get_agriculture_baseyear_demand(var): 

280 """ 

281 Pull base year commodity outputs by sector 

282 Get total base year service demand and energy demand 

283 Include sector, tech, and enduse labels 

284 

285 variable must be one of InputEnergy, OutputEnergy 

286 """ 

287 if var not in ["OutputEnergy", "InputEnergy"]: 

288 raise ValueError( 

289 f"Invalid variable '{var}'. Must be 'InputEnergy' or 'OutputEnergy'." 

290 ) 

291 

292 df = pd.read_csv(STAGE_2_DATA / "ag_forest_fish/baseyear_ag_forest_fish_demand.csv") 

293 df = df[df["Variable"] == var] 

294 df = ( 

295 df.groupby( 

296 [ 

297 "Sector", 

298 "CommodityOut", 

299 "Island", 

300 "Technology", 

301 "EndUse", 

302 "Variable", 

303 "Unit", 

304 ] 

305 )["Value"] 

306 .sum() 

307 .reset_index() 

308 ) 

309 return df 

310 

311 

312def get_energy_demand_projections(energy_type): 

313 """ 

314 Combine base year demand (input or output) and growth indices 

315 Create forward projections based on these 

316 Note that Output energy is required for the model 

317 InputEnergy is sometimes useful for communication purposes 

318 """ 

319 

320 index = get_agriculture_growth_indices() 

321 base_year = get_agriculture_baseyear_demand(energy_type) 

322 df = base_year.merge(index, on="Sector", how="left") 

323 df["Value"] = df["Value"] * df["Index"] 

324 

325 return df 

326 

327 

328def main(): 

329 """Script entrypoint""" 

330 df_input_energy = get_energy_demand_projections("InputEnergy") 

331 df_output_energy = get_energy_demand_projections("OutputEnergy") 

332 save_agr_proj_check(df_input_energy, "agriculture_input.csv", "Input energy") 

333 save_agr_proj_check(df_output_energy, "agriculture_output.csv", "Output energy") 

334 

335 # the above is extra detail for reporting. The model just needs indices: 

336 

337 df_index = get_agriculture_growth_indices() 

338 save_agr_proj_data( 

339 df_index, "agriculture_demand_index.csv", "Agriculture demand index" 

340 ) 

341 

342 

343if __name__ == "__main__": 

344 main()