Coverage for src/prepare_times_nz/stage_3/demand_projections/agriculture.py: 90%
144 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 23:05 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-16 23:05 +0000
1"""
2This module converts Agri, forest, fish input demand projection assumptions
3 and compiles indices for all commodities and scenarios
5A separate stage 4 module can extract these
6 into the relevant demand scenario files
8"""
10from pathlib import Path
12import pandas as pd
13from prepare_times_nz.stage_0.stage_0_settings import BASE_YEAR
14from prepare_times_nz.utilities.data_in_out import _save_data
15from prepare_times_nz.utilities.filepaths import (
16 ASSUMPTIONS,
17 EXTERNAL_DATA,
18 STAGE_2_DATA,
19 STAGE_3_DATA,
20)
22# CONSTANTS
23# file paths
24PROJECTIONS_ASSUMPTIONS = ASSUMPTIONS / "demand_projections"
25OUTPUT = STAGE_3_DATA / "demand_projections"
26OUTPUT_CHECKS = OUTPUT / "checks"
27# end year setting
28END_YEAR = 2050
29ASSUMPTIONS_FILE = PROJECTIONS_ASSUMPTIONS / "agriculture_demand_projections.csv"
32# HELPERS
33def save_agr_proj_data(df, name, label):
34 """save data wrapper"""
35 label = "Saving ag, forest, and fish demand projections (" + label + ")"
36 _save_data(df, name, label, filepath=OUTPUT)
39def save_agr_proj_check(df, name, label):
40 """save checking data wrapper"""
41 label = "Saving ag, forest, and fish demand checks (" + label + ")"
42 _save_data(df, name, label, filepath=OUTPUT_CHECKS)
45def normalise_optional_text(value):
46 """Return stripped text or None for blanks/NaN-like values."""
47 if pd.isna(value):
48 return None
49 text = str(value).strip()
50 return text or None
53def load_agriculture_projection_assumptions(assumptions_path=ASSUMPTIONS_FILE):
54 """Load agriculture projection mappings and validate expected columns."""
55 df = pd.read_csv(assumptions_path).copy()
56 df = df[df["SectorGroup"] == "Agriculture, Forestry and Fishing"].copy()
58 required_cols = {
59 "SectorGroup",
60 "Sector",
61 "Scenario",
62 "Method",
63 "Workbook",
64 "SheetName",
65 "SourceCategory1",
66 "SourceCategory2",
67 "ConstantIndex",
68 "Note",
69 }
70 missing_cols = required_cols.difference(df.columns)
71 if missing_cols:
72 raise ValueError(
73 "Agriculture demand projection assumptions are missing columns: "
74 + ", ".join(sorted(missing_cols))
75 )
77 df["Scenario"] = df["Scenario"].astype(str).str.strip().str.title()
78 df["Method"] = df["Method"].astype(str).str.strip().str.title()
79 df["Workbook"] = df["Workbook"].apply(normalise_optional_text)
80 df["SheetName"] = df["SheetName"].apply(normalise_optional_text)
81 df["SourceCategory1"] = df["SourceCategory1"].apply(normalise_optional_text)
82 df["SourceCategory2"] = df["SourceCategory2"].apply(normalise_optional_text)
83 df["ConstantIndex"] = pd.to_numeric(df["ConstantIndex"], errors="coerce")
85 duplicates = df.duplicated(["SectorGroup", "Sector", "Scenario"], keep=False)
86 if duplicates.any():
87 duplicate_rows = df.loc[duplicates, ["SectorGroup", "Sector", "Scenario"]]
88 raise ValueError(
89 "Duplicate agriculture demand projection mappings found:\n"
90 + duplicate_rows.to_string(index=False)
91 )
93 return df
96def load_workbook_projection_sheet(workbook_path, sheet_name):
97 """
98 Load one ERP2 projection sheet.
100 The workbook stores years as columns, with the first two columns acting as
101 row identifiers for the projection categories.
102 """
103 df = pd.read_excel(workbook_path, sheet_name=sheet_name, header=5)
104 first_two_cols = list(df.columns[:2])
105 df = df.rename(
106 columns={
107 first_two_cols[0]: "SourceCategory1",
108 first_two_cols[1]: "SourceCategory2",
109 }
110 )
112 rename_map = {}
113 for col in df.columns:
114 if isinstance(col, float) and col.is_integer():
115 rename_map[col] = int(col)
116 df = df.rename(columns=rename_map)
118 year_cols = [
119 col
120 for col in df.columns
121 if isinstance(col, int) and BASE_YEAR <= col <= END_YEAR
122 ]
123 if not year_cols:
124 raise ValueError(
125 f"No year columns between {BASE_YEAR} and {END_YEAR} found in "
126 f"{workbook_path} [{sheet_name}]"
127 )
129 return df[["SourceCategory1", "SourceCategory2"] + year_cols].copy()
132def get_cached_workbook_projection_sheet(mapping_row, sheet_cache, external_data_dir):
133 """Return a cached ERP2 worksheet dataframe for the configured mapping row."""
134 workbook_path = Path(external_data_dir) / mapping_row["Workbook"]
135 cache_key = (str(workbook_path), mapping_row["SheetName"])
136 if cache_key not in sheet_cache:
137 sheet_cache[cache_key] = load_workbook_projection_sheet(
138 workbook_path, mapping_row["SheetName"]
139 )
140 return sheet_cache[cache_key].copy()
143def select_projection_match(df, mapping_row):
144 """Select the single workbook row that matches the configured categories."""
145 mask = pd.Series(True, index=df.index)
146 for column in ["SourceCategory1", "SourceCategory2"]:
147 value = mapping_row[column]
148 if value is not None:
149 mask &= df[column].astype(str).str.strip().eq(value)
151 matches = df.loc[mask].copy()
152 mapping_details = (
153 f"{mapping_row['Sector']} / {mapping_row['Scenario']} "
154 f"using [{mapping_row['SourceCategory1']!r}, {mapping_row['SourceCategory2']!r}] "
155 f"in {mapping_row['Workbook']} [{mapping_row['SheetName']}]"
156 )
157 if matches.empty:
158 raise ValueError(
159 "No ERP2 workbook row matched agriculture projection mapping for "
160 + mapping_details
161 )
162 if len(matches) > 1:
163 raise ValueError(
164 "Multiple ERP2 workbook rows matched agriculture projection mapping for "
165 + mapping_details
166 )
167 return matches.iloc[0]
170def build_projection_index_frame(match_row, mapping_row):
171 """Convert a matched workbook row into a base-year index dataframe."""
172 year_cols = [col for col in match_row.index if isinstance(col, int)]
173 values = pd.to_numeric(match_row[year_cols], errors="coerce")
174 if pd.isna(values.get(BASE_YEAR)) or values[BASE_YEAR] == 0:
175 raise ValueError(
176 "Cannot compute agriculture projection index because the base-year value "
177 f"is missing or zero for {mapping_row['Sector']} / {mapping_row['Scenario']}"
178 )
180 out = pd.DataFrame({"Year": year_cols, "Index": values.values / values[BASE_YEAR]})
181 out["SectorGroup"] = mapping_row["SectorGroup"]
182 out["Sector"] = mapping_row["Sector"]
183 out["Scenario"] = mapping_row["Scenario"]
184 return out[["SectorGroup", "Sector", "Scenario", "Year", "Index"]]
187def get_workbook_projection_indices(
188 mapping_row, sheet_cache, external_data_dir=EXTERNAL_DATA
189):
190 """Read one configured ERP2 row and convert it to a base-year index series."""
191 df = get_cached_workbook_projection_sheet(
192 mapping_row, sheet_cache, external_data_dir
193 )
194 match_row = select_projection_match(df, mapping_row)
195 return build_projection_index_frame(match_row, mapping_row)
198def get_constant_projection_indices(mapping_row):
199 """Create a flat constant projection series from configuration."""
200 constant_index = mapping_row["ConstantIndex"]
201 if pd.isna(constant_index):
202 raise ValueError(
203 f"ConstantIndex is required for constant agriculture mapping: {mapping_row['Sector']}"
204 )
206 out = pd.DataFrame({"Year": range(BASE_YEAR, END_YEAR + 1)})
207 out["Index"] = float(constant_index)
208 out["SectorGroup"] = mapping_row["SectorGroup"]
209 out["Sector"] = mapping_row["Sector"]
210 out["Scenario"] = mapping_row["Scenario"]
212 return out[["SectorGroup", "Sector", "Scenario", "Year", "Index"]]
215def expand_projection_indices(df):
216 """Expand projection rows to a full annual series and interpolate missing years."""
217 group_vars = ["SectorGroup", "Sector", "Scenario"]
218 years = pd.DataFrame({"Year": range(BASE_YEAR, END_YEAR + 1)})
219 groups = df[group_vars].drop_duplicates()
221 full = groups.merge(years, how="cross")
222 full = full.merge(df, on=group_vars + ["Year"], how="left")
223 full = full.sort_values(group_vars + ["Year"])
224 full["Index"] = full.groupby(group_vars, group_keys=False)["Index"].apply(
225 lambda s: s.interpolate(method="linear").ffill().bfill()
226 )
228 return full[group_vars + ["Year", "Index"]]
231# FUNCTIONS
232def get_agriculture_growth_indices(
233 assumptions_path=ASSUMPTIONS_FILE, external_data_dir=EXTERNAL_DATA
234):
235 """
236 Build agriculture demand indices from a mapping config.
238 Each row in the assumptions CSV maps a TIMES-NZ sector/scenario either to:
239 - a workbook row in the ERP2 detailed results workbook, or
240 - a constant index for sectors that remain flat
241 """
242 mappings = load_agriculture_projection_assumptions(assumptions_path)
243 sheet_cache = {}
244 projection_parts = []
246 for _, mapping_row in mappings.iterrows():
247 method = mapping_row["Method"]
248 if method == "Workbook":
249 projection_parts.append(
250 get_workbook_projection_indices(
251 mapping_row,
252 sheet_cache=sheet_cache,
253 external_data_dir=external_data_dir,
254 )
255 )
256 elif method == "Constant":
257 projection_parts.append(get_constant_projection_indices(mapping_row))
258 else:
259 raise ValueError(
260 f"Unsupported agriculture demand projection method: {method}"
261 )
263 combined = pd.concat(projection_parts, ignore_index=True)
264 duplicates = combined.duplicated(
265 ["SectorGroup", "Sector", "Scenario", "Year"], keep=False
266 )
267 if duplicates.any():
268 duplicate_rows = combined.loc[
269 duplicates, ["SectorGroup", "Sector", "Scenario", "Year"]
270 ]
271 raise ValueError(
272 "Duplicate agriculture demand projection rows found after compilation:\n"
273 + duplicate_rows.to_string(index=False)
274 )
276 return expand_projection_indices(combined)
279def get_agriculture_baseyear_demand(var):
280 """
281 Pull base year commodity outputs by sector
282 Get total base year service demand and energy demand
283 Include sector, tech, and enduse labels
285 variable must be one of InputEnergy, OutputEnergy
286 """
287 if var not in ["OutputEnergy", "InputEnergy"]:
288 raise ValueError(
289 f"Invalid variable '{var}'. Must be 'InputEnergy' or 'OutputEnergy'."
290 )
292 df = pd.read_csv(STAGE_2_DATA / "ag_forest_fish/baseyear_ag_forest_fish_demand.csv")
293 df = df[df["Variable"] == var]
294 df = (
295 df.groupby(
296 [
297 "Sector",
298 "CommodityOut",
299 "Island",
300 "Technology",
301 "EndUse",
302 "Variable",
303 "Unit",
304 ]
305 )["Value"]
306 .sum()
307 .reset_index()
308 )
309 return df
312def get_energy_demand_projections(energy_type):
313 """
314 Combine base year demand (input or output) and growth indices
315 Create forward projections based on these
316 Note that Output energy is required for the model
317 InputEnergy is sometimes useful for communication purposes
318 """
320 index = get_agriculture_growth_indices()
321 base_year = get_agriculture_baseyear_demand(energy_type)
322 df = base_year.merge(index, on="Sector", how="left")
323 df["Value"] = df["Value"] * df["Index"]
325 return df
328def main():
329 """Script entrypoint"""
330 df_input_energy = get_energy_demand_projections("InputEnergy")
331 df_output_energy = get_energy_demand_projections("OutputEnergy")
332 save_agr_proj_check(df_input_energy, "agriculture_input.csv", "Input energy")
333 save_agr_proj_check(df_output_energy, "agriculture_output.csv", "Output energy")
335 # the above is extra detail for reporting. The model just needs indices:
337 df_index = get_agriculture_growth_indices()
338 save_agr_proj_data(
339 df_index, "agriculture_demand_index.csv", "Agriculture demand index"
340 )
343if __name__ == "__main__":
344 main()