diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 15a67e89e..a569c6208 100755 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -260,7 +260,8 @@ Added `#205 `_ * Update osm for status2023 `#169 `_ - +* Add wrapped_partial to dynamise task generation + `#207 `_ .. _PR #159: https://github.com/openego/eGon-data/pull/159 diff --git a/src/egon/data/airflow/dags/pipeline_status_quo.py b/src/egon/data/airflow/dags/pipeline_status_quo.py index 79092b220..a5bcc71bd 100755 --- a/src/egon/data/airflow/dags/pipeline_status_quo.py +++ b/src/egon/data/airflow/dags/pipeline_status_quo.py @@ -44,7 +44,7 @@ from egon.data.datasets.heat_etrago import HeatEtrago from egon.data.datasets.heat_etrago.hts_etrago import HtsEtragoTable from egon.data.datasets.heat_supply import HeatSupply -from egon.data.datasets.heat_supply.individual_heating import HeatPumps2019 +from egon.data.datasets.heat_supply.individual_heating import HeatPumpsStatusQuo from egon.data.datasets.industrial_sites import MergeIndustrialSites from egon.data.datasets.industry import IndustrialDemandCurves from egon.data.datasets.loadarea import LoadArea, OsmLanduse @@ -453,7 +453,7 @@ chp_etrago = ChpEtrago(dependencies=[chp, heat_etrago]) # Heat pump disaggregation for status2019 - heat_pumps_2019 = HeatPumps2019( + heat_pumps_2019 = HeatPumpsStatusQuo( dependencies=[ cts_demand_buildings, DistrictHeatingAreas, diff --git a/src/egon/data/datasets/heat_etrago/__init__.py b/src/egon/data/datasets/heat_etrago/__init__.py index 9b062b5eb..69c21e1b8 100644 --- a/src/egon/data/datasets/heat_etrago/__init__.py +++ b/src/egon/data/datasets/heat_etrago/__init__.py @@ -69,14 +69,14 @@ def insert_buses(carrier, scenario): SELECT ST_Centroid(geom) AS geom FROM {sources['mv_grids']['schema']}. {sources['mv_grids']['table']} - WHERE bus_id IN - (SELECT DISTINCT bus_id + WHERE bus_id IN + (SELECT DISTINCT bus_id FROM boundaries.egon_map_zensus_grid_districts a - JOIN demand.egon_peta_heat b + JOIN demand.egon_peta_heat b ON a.zensus_population_id = b.zensus_population_id WHERE b.scenario = '{scenario}' AND b.zensus_population_id NOT IN ( - SELECT zensus_population_id FROM + SELECT zensus_population_id FROM demand.egon_map_zensus_district_heating_areas WHERE scenario = '{scenario}' ) @@ -263,7 +263,7 @@ def insert_store(scenario, carrier): def store(): for scenario in config.settings()["egon-data"]["--scenarios"]: - if scenario != "status2019": + if "status" not in scenario: insert_store(scenario, "central_heat") insert_store(scenario, "rural_heat") @@ -290,8 +290,8 @@ def insert_central_direct_heat(scenario): {targets['heat_generators']['table']} WHERE carrier IN ('solar_thermal_collector', 'geo_thermal') AND scn_name = '{scenario}' - AND bus IN - (SELECT bus_id + AND bus IN + (SELECT bus_id FROM {targets['heat_buses']['schema']}. {targets['heat_buses']['table']} WHERE scn_name = '{scenario}' @@ -368,13 +368,15 @@ def insert_central_direct_heat(scenario): # Map solar thermal collectors to weather cells join = gpd.sjoin(weather_cells, solar_thermal)[["index_right"]] + weather_year = get_sector_parameters("global", scenario)["weather_year"] + feedin = db.select_dataframe( f""" SELECT w_id, feedin FROM {sources['feedin_timeseries']['schema']}. {sources['feedin_timeseries']['table']} WHERE carrier = 'solar_thermal' - AND weather_year = 2019 + AND weather_year = {weather_year} """, index_col="w_id", ) @@ -511,14 +513,14 @@ def insert_rural_gas_boilers(scenario): {targets['heat_links']['table']} WHERE carrier = 'rural_gas_boiler' AND scn_name = '{scenario}' - AND bus0 IN - (SELECT bus_id + AND bus0 IN + (SELECT bus_id FROM {targets['heat_buses']['schema']}. {targets['heat_buses']['table']} WHERE scn_name = '{scenario}' AND country = 'DE') - AND bus1 IN - (SELECT bus_id + AND bus1 IN + (SELECT bus_id FROM {targets['heat_buses']['schema']}. {targets['heat_buses']['table']} WHERE scn_name = '{scenario}' @@ -607,8 +609,8 @@ def supply(): """ for scenario in config.settings()["egon-data"]["--scenarios"]: - # There is no direct heat in status2019 scenario - if scenario != "status2019": + # There is no direct heat in status quo scenario + if "status" not in scenario: insert_central_direct_heat(scenario) insert_central_power_to_heat(scenario) insert_individual_power_to_heat(scenario) @@ -619,7 +621,7 @@ class HeatEtrago(Dataset): def __init__(self, dependencies): super().__init__( name="HeatEtrago", - version="0.0.11", + version="0.0.12", dependencies=dependencies, tasks=(buses, supply, store), - ) \ No newline at end of file + ) diff --git a/src/egon/data/datasets/heat_supply/__init__.py b/src/egon/data/datasets/heat_supply/__init__.py index b2c97a36c..83ed5e7d5 100644 --- a/src/egon/data/datasets/heat_supply/__init__.py +++ b/src/egon/data/datasets/heat_supply/__init__.py @@ -94,8 +94,8 @@ def district_heating(): if_exists="append", ) - # Do not check data for status2019 as is it not listed in the table - if scenario != "status2019": + # Do not check data for status quo as is it not listed in the table + if "status" not in scenario: # Compare target value with sum of distributed heat supply df_check = db.select_dataframe( f""" @@ -128,8 +128,8 @@ def district_heating(): if_exists="append", ) - # Insert resistive heaters which are not available in status2019 - if scenario != "status2019": + # Insert resistive heaters which are not available in status quo + if "status" not in scenario: backup_rh = backup_resistive_heaters(scenario) if not backup_rh.empty: @@ -182,7 +182,7 @@ class HeatSupply(Dataset): def __init__(self, dependencies): super().__init__( name="HeatSupply", - version="0.0.9", + version="0.0.10", dependencies=dependencies, tasks=( create_tables, diff --git a/src/egon/data/datasets/heat_supply/individual_heating.py b/src/egon/data/datasets/heat_supply/individual_heating.py index fa437c744..ea1148f57 100644 --- a/src/egon/data/datasets/heat_supply/individual_heating.py +++ b/src/egon/data/datasets/heat_supply/individual_heating.py @@ -194,7 +194,7 @@ import saio from egon.data import config, db, logger -from egon.data.datasets import Dataset +from egon.data.datasets import Dataset, wrapped_partial from egon.data.datasets.district_heating_areas import ( MapZensusDistrictHeatingAreas, ) @@ -264,6 +264,7 @@ def dyn_parallel_tasks_pypsa_eur_sec(): ) tasks = set() + for i in range(parallel_tasks): tasks.add( PythonOperator( @@ -295,9 +296,9 @@ def dyn_parallel_tasks_pypsa_eur_sec(): ) -class HeatPumps2019(Dataset): +class HeatPumpsStatusQuo(Dataset): def __init__(self, dependencies): - def dyn_parallel_tasks_2019(): + def dyn_parallel_tasks_status_quo(scenario): """Dynamically generate tasks The goal is to speed up tasks by parallelising bulks of mvgds. @@ -310,40 +311,67 @@ def dyn_parallel_tasks_2019(): set of airflow.PythonOperators The tasks. Each element is of :func:`egon.data.datasets.heat_supply.individual_heating. - determine_hp_cap_peak_load_mvgd_ts_2019` + determine_hp_cap_peak_load_mvgd_ts_status_quo` """ parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get( "parallel_tasks", 1 ) + tasks = set() + for i in range(parallel_tasks): tasks.add( PythonOperator( task_id=( "individual_heating." - f"determine-hp-capacity-2019-" + f"determine-hp-capacity-{scenario}-" f"mvgd-bulk{i}" ), python_callable=split_mvgds_into_bulks, op_kwargs={ "n": i, "max_n": parallel_tasks, - "func": determine_hp_cap_peak_load_mvgd_ts_2019, + "scenario": scenario, + "func": determine_hp_cap_peak_load_mvgd_ts_status_quo, }, ) ) return tasks + tasks = () + + for scenario in config.settings()["egon-data"]["--scenarios"]: + if "status" in scenario: + postfix = f"_{scenario[-4:]}" + + tasks += ( + wrapped_partial( + delete_heat_peak_loads_status_quo, + scenario=scenario, + postfix=postfix, + ), + wrapped_partial( + delete_hp_capacity_status_quo, + scenario=scenario, + postfix=postfix, + ), + wrapped_partial( + delete_mvgd_ts_status_quo, + scenario=scenario, + postfix=postfix, + ), + ) + + tasks += ( + {*dyn_parallel_tasks_status_quo(scenario)}, + ) + + super().__init__( - name="HeatPumps2019", - version="0.0.2", + name="HeatPumpsStatusQuo", + version="0.0.4", dependencies=dependencies, - tasks=( - delete_heat_peak_loads_2019, - delete_hp_capacity_2019, - delete_mvgd_ts_2019, - {*dyn_parallel_tasks_2019()}, - ), + tasks=tasks, ) @@ -367,7 +395,9 @@ def dyn_parallel_tasks_2035(): parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get( "parallel_tasks", 1 ) + tasks = set() + for i in range(parallel_tasks): tasks.add( PythonOperator( @@ -612,12 +642,14 @@ def cascade_heat_supply_indiv(scenario, distribution_level, plotting=True): columns=["estimated_flh", "priority"], data={"estimated_flh": [4000, 8000], "priority": [2, 1]}, ) - elif scenario == "status2019": + elif "status" in scenario: technologies = pd.DataFrame( index=["heat_pump"], columns=["estimated_flh", "priority"], data={"estimated_flh": [4000], "priority": [2]}, ) + else: + raise ValueError(f"{scenario=} is not valid.") # In the beginning, the remaining demand equals demand heat_per_mv["remaining_demand"] = heat_per_mv["demand"] @@ -1882,9 +1914,9 @@ def determine_hp_cap_peak_load_mvgd_ts_2035(mvgd_ids): ) -def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): +def determine_hp_cap_peak_load_mvgd_ts_status_quo(mvgd_ids, scenario): """ - Main function to determine HP capacity per building in status2019 scenario. + Main function to determine HP capacity per building in status quo scenario. Further, creates heat demand time series for all buildings with heat pumps in MV grid, as well as for all buildings with gas boilers, used in eTraGo. @@ -1901,7 +1933,7 @@ def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): # ===================================================== df_peak_loads_db = pd.DataFrame() - df_hp_cap_per_building_2019_db = pd.DataFrame() + df_hp_cap_per_building_status_quo_db = pd.DataFrame() df_heat_mvgd_ts_db = pd.DataFrame() for mvgd in mvgd_ids: @@ -1910,20 +1942,20 @@ def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): # ############# aggregate residential and CTS demand profiles ##### df_heat_ts = aggregate_residential_and_cts_profiles( - mvgd, scenario="status2019" + mvgd, scenario=scenario ) # ##################### determine peak loads ################### logger.info(f"MVGD={mvgd} | Determine peak loads.") - peak_load_2019 = df_heat_ts.max().rename("status2019") + peak_load_status_quo = df_heat_ts.max().rename(scenario) # ######## determine HP capacity per building ######### logger.info(f"MVGD={mvgd} | Determine HP capacities.") buildings_decentral_heating = ( get_buildings_with_decentral_heat_demand_in_mv_grid( - mvgd, scenario="status2019" + mvgd, scenario=scenario ) ) @@ -1931,33 +1963,30 @@ def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): # TODO maybe remove after succesfull DE run # Might be fixed in #990 buildings_decentral_heating = catch_missing_buidings( - buildings_decentral_heating, peak_load_2019 + buildings_decentral_heating, peak_load_status_quo ) - hp_cap_per_building_2019 = determine_hp_cap_buildings_pvbased_per_mvgd( - "status2019", + hp_cap_per_building_status_quo = determine_hp_cap_buildings_pvbased_per_mvgd( + scenario, mvgd, - peak_load_2019, + peak_load_status_quo, buildings_decentral_heating, ) - buildings_gas_2019 = pd.Index(buildings_decentral_heating).drop( - hp_cap_per_building_2019.index - ) # ################ aggregated heat profiles ################### logger.info(f"MVGD={mvgd} | Aggregate heat profiles.") - df_mvgd_ts_2019_hp = df_heat_ts.loc[ + df_mvgd_ts_status_quo_hp = df_heat_ts.loc[ :, - hp_cap_per_building_2019.index, + hp_cap_per_building_status_quo.index, ].sum(axis=1) df_heat_mvgd_ts = pd.DataFrame( data={ "carrier": "heat_pump", "bus_id": mvgd, - "scenario": "status2019", - "dist_aggregated_mw": [df_mvgd_ts_2019_hp.to_list()], + "scenario": scenario, + "dist_aggregated_mw": [df_mvgd_ts_status_quo_hp.to_list()], } ) @@ -1965,7 +1994,7 @@ def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): logger.info(f"MVGD={mvgd} | Collect results.") df_peak_loads_db = pd.concat( - [df_peak_loads_db, peak_load_2019.reset_index()], + [df_peak_loads_db, peak_load_status_quo.reset_index()], axis=0, ignore_index=True, ) @@ -1974,10 +2003,10 @@ def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): [df_heat_mvgd_ts_db, df_heat_mvgd_ts], axis=0, ignore_index=True ) - df_hp_cap_per_building_2019_db = pd.concat( + df_hp_cap_per_building_status_quo_db = pd.concat( [ - df_hp_cap_per_building_2019_db, - hp_cap_per_building_2019.reset_index(), + df_hp_cap_per_building_status_quo_db, + hp_cap_per_building_status_quo.reset_index(), ], axis=0, ) @@ -1987,11 +2016,11 @@ def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=False) - df_hp_cap_per_building_2019_db["scenario"] = "status2019" + df_hp_cap_per_building_status_quo_db["scenario"] = scenario # TODO debug duplicated building_ids - duplicates = df_hp_cap_per_building_2019_db.loc[ - df_hp_cap_per_building_2019_db.duplicated("building_id", keep=False) + duplicates = df_hp_cap_per_building_status_quo_db.loc[ + df_hp_cap_per_building_status_quo_db.duplicated("building_id", keep=False) ] if not duplicates.empty: @@ -2000,14 +2029,14 @@ def determine_hp_cap_peak_load_mvgd_ts_2019(mvgd_ids): f"{duplicates.loc[:,['building_id', 'hp_capacity']]}" ) - df_hp_cap_per_building_2019_db.drop_duplicates("building_id", inplace=True) + df_hp_cap_per_building_status_quo_db.drop_duplicates("building_id", inplace=True) - df_hp_cap_per_building_2019_db.building_id = ( - df_hp_cap_per_building_2019_db.building_id.astype(int) + df_hp_cap_per_building_status_quo_db.building_id = ( + df_hp_cap_per_building_status_quo_db.building_id.astype(int) ) write_table_to_postgres( - df_hp_cap_per_building_2019_db, + df_hp_cap_per_building_status_quo_db, EgonHpCapacityBuildings, drop=False, ) @@ -2118,7 +2147,7 @@ def determine_hp_cap_peak_load_mvgd_ts_pypsa_eur_sec(mvgd_ids): export_min_cap_to_csv(df_hp_min_cap_mv_grid_pypsa_eur_sec) -def split_mvgds_into_bulks(n, max_n, func): +def split_mvgds_into_bulks(n, max_n, func, scenario=None): """ Generic function to split task into multiple parallel tasks, dividing the number of MVGDs into even bulks. @@ -2156,7 +2185,11 @@ def split_mvgds_into_bulks(n, max_n, func): mvgd_ids = mvgd_ids[n] logger.info(f"Bulk takes care of MVGD: {min(mvgd_ids)} : {max(mvgd_ids)}") - func(mvgd_ids) + + if scenario is not None: + func(mvgd_ids, scenario=scenario) + else: + func(mvgd_ids) def delete_hp_capacity(scenario): @@ -2199,10 +2232,10 @@ def delete_hp_capacity_100RE(): delete_hp_capacity(scenario="eGon100RE") -def delete_hp_capacity_2019(): - """Remove all hp capacities for the selected status2019""" +def delete_hp_capacity_status_quo(scenario): + """Remove all hp capacities for the selected status quo""" EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True) - delete_hp_capacity(scenario="status2019") + delete_hp_capacity(scenario=scenario) def delete_hp_capacity_2035(): @@ -2211,12 +2244,12 @@ def delete_hp_capacity_2035(): delete_hp_capacity(scenario="eGon2035") -def delete_mvgd_ts_2019(): - """Remove all mvgd ts for the selected status2019""" +def delete_mvgd_ts_status_quo(scenario): + """Remove all mvgd ts for the selected status quo""" EgonEtragoTimeseriesIndividualHeating.__table__.create( bind=engine, checkfirst=True ) - delete_mvgd_ts(scenario="status2019") + delete_mvgd_ts(scenario=scenario) def delete_mvgd_ts_2035(): @@ -2235,13 +2268,13 @@ def delete_mvgd_ts_100RE(): delete_mvgd_ts(scenario="eGon100RE") -def delete_heat_peak_loads_2019(): - """Remove all heat peak loads for status2019.""" +def delete_heat_peak_loads_status_quo(scenario): + """Remove all heat peak loads for status quo.""" BuildingHeatPeakLoads.__table__.create(bind=engine, checkfirst=True) with db.session_scope() as session: # Buses session.query(BuildingHeatPeakLoads).filter( - BuildingHeatPeakLoads.scenario == "status2019" + BuildingHeatPeakLoads.scenario == scenario ).delete(synchronize_session=False)