diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fceea453d..160ddd430 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,8 +4,7 @@ name: "Tests, code style & coverage" on: push: branches: - - dev - - main + - '**' pull_request: branches: - dev diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7525cc5ac..56c5332f8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -137,6 +137,8 @@ Added * Extend zensus by a combined table with all cells where there's either building, apartment or population data `#359 `_ +* Include allocation of pumped hydro units + `#332 `_ * Add example metadata for OSM, VG250 and Zensus VG250. Add metadata templates for licences, context and some helper functions. Extend docs on how to create metadata for tables. @@ -147,8 +149,14 @@ Added `#330 `_ * Distribute wind offshore capacities `#329 `_ +* Add CH4 storages + `#405 `_ * Include allocation of conventional (non CHP) power plants `#392 `_ +* Fill egon-etrago-generators table + `#485 `_ +* Include time-dependent coefficient of performance for heat pumps + `#532 `_ .. _PR #159: https://github.com/openego/eGon-data/pull/159 @@ -232,6 +240,16 @@ Changed `#463 `_ * Update deposit id for zenodo download `#397 `_ +* Add to etrago.setug.py the busmap table + `#484 `_ +* Migrate dlr script to datasets + `#508 `_ +* Migrate plot.py to dataset of district heating areas + `#527 `_ +* Migrate substation scripts to datasets + `#304 `_ +* Update deposit_id for zenodo download + `#540 `_ Bug fixes @@ -295,6 +313,13 @@ Bug fixes `#414 `_ * Exchange bus 0 and bus 1 in Power-to-H2 links `#458 `_ +* Fix missing cts demands for eGon2035 + `#511 `_ * Add `data_bundle` to `industrial_sites` task dependencies `#468 `_ - +* Lift `geopandas` minimum requirement to `0.10.0` + `#504 `_ +* Use inbuilt `datetime` package instead of `pandas.datetime` + `#516 `_ +* Delete only AC loads for eTraGo in electricity_demand_etrago + `#535 `_ diff --git a/README.rst b/README.rst index b3929e0e1..e3ffd5d1c 100644 --- a/README.rst +++ b/README.rst @@ -179,7 +179,7 @@ to it. Run the workflow ================ -The :py:mod:`egon.data` package installs a command line application +The :code:`egon.data` package installs a command line application called :code:`egon-data` with which you can control the workflow so once the installation is successful, you can explore the command line interface starting with :code:`egon-data --help`. @@ -200,7 +200,7 @@ solution. .. warning:: A complete run of the workflow might require much computing power and - can't be run on laptop. Use the :ref:`test mode ` for + can't be run on laptop. Use the `test mode <#test-mode>`_ for experimenting. .. warning:: diff --git a/setup.py b/setup.py index d6aae308a..864241388 100755 --- a/setup.py +++ b/setup.py @@ -86,7 +86,8 @@ def read(*names, **kwargs): "atlite==0.2.5", "cdsapi", "click", - "geopandas>=0.9.0", + "geopandas>=0.10.0", + "geopy", "importlib-resources", "loguru", "matplotlib", diff --git a/src/egon/data/airflow/dags/pipeline.py b/src/egon/data/airflow/dags/pipeline.py index ea472ff7a..0fd86d8b8 100755 --- a/src/egon/data/airflow/dags/pipeline.py +++ b/src/egon/data/airflow/dags/pipeline.py @@ -1,19 +1,15 @@ import os -import airflow -import egon.data.datasets.gas_grid as gas_grid -import egon.data.importing.zensus as import_zs -import egon.data.processing.calculate_dlr as dlr -import egon.data.processing.gas_areas as gas_areas -import egon.data.processing.loadarea as loadarea -import egon.data.processing.power_to_h2 as power_to_h2 -import egon.data.processing.substation as substation -import importlib_resources as resources from airflow.operators.postgres_operator import PostgresOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago +import airflow +import importlib_resources as resources + from egon.data import db from egon.data.datasets import database +from egon.data.datasets.calculate_dlr import Calculate_dlr +from egon.data.datasets.ch4_storages import CH4Storages from egon.data.datasets.chp import Chp from egon.data.datasets.chp_etrago import ChpEtrago from egon.data.datasets.data_bundle import DataBundle @@ -28,10 +24,14 @@ from egon.data.datasets.electricity_demand_etrago import ElectricalLoadEtrago from egon.data.datasets.era5 import WeatherData from egon.data.datasets.etrago_setup import EtragoSetup +from egon.data.datasets.fill_etrago_gen import Egon_etrago_gen +from egon.data.datasets.gas_grid import GasNodesandPipes from egon.data.datasets.gas_prod import CH4Production from egon.data.datasets.heat_demand import HeatDemandImport from egon.data.datasets.heat_demand_europe import HeatDemandEurope +from egon.data.datasets.heat_demand_timeseries.HTS import HeatTimeSeries from egon.data.datasets.heat_etrago import HeatEtrago +from egon.data.datasets.heat_etrago.hts_etrago import HtsEtragoTable from egon.data.datasets.heat_supply import HeatSupply from egon.data.datasets.hh_demand_profiles import ( hh_demand_setup, @@ -52,12 +52,22 @@ from egon.data.datasets.scenario_capacities import ScenarioCapacities from egon.data.datasets.scenario_parameters import ScenarioParameters from egon.data.datasets.society_prognosis import SocietyPrognosis +from egon.data.datasets.storages import PumpedHydro +from egon.data.datasets.substation import SubstationExtraction +from egon.data.datasets.substation_voronoi import SubstationVoronoi from egon.data.datasets.tyndp import Tyndp from egon.data.datasets.vg250 import Vg250 from egon.data.datasets.vg250_mv_grid_districts import Vg250MvGridDistricts from egon.data.datasets.zensus_mv_grid_districts import ZensusMvGridDistricts from egon.data.datasets.zensus_vg250 import ZensusVg250 -from egon.data.datasets.heat_demand_timeseries.HTS import HeatTimeSeries +from egon.data.processing.gas_areas import GasAreas +from egon.data.processing.power_to_h2 import PowertoH2 +import egon.data.datasets.gas_grid as gas_grid +import egon.data.importing.zensus as import_zs +import egon.data.processing.gas_areas as gas_areas +import egon.data.processing.loadarea as loadarea +import egon.data.processing.power_to_h2 as power_to_h2 + with airflow.DAG( "egon-data-processing-pipeline", @@ -184,43 +194,15 @@ mastr_data.insert_into(pipeline) retrieve_mastr_data = tasks["mastr.download-mastr-data"] - # Substation extraction - substation_tables = PythonOperator( - task_id="create_substation_tables", - python_callable=substation.create_tables, - ) - - substation_functions = PythonOperator( - task_id="substation_functions", - python_callable=substation.create_sql_functions, - ) - - hvmv_substation_extraction = PostgresOperator( - task_id="hvmv_substation_extraction", - sql=resources.read_text(substation, "hvmv_substation.sql"), - postgres_conn_id="egon_data", - autocommit=True, + substation_extraction = SubstationExtraction( + dependencies=[osm_add_metadata, vg250_clean_and_prepare] ) - ehv_substation_extraction = PostgresOperator( - task_id="ehv_substation_extraction", - sql=resources.read_text(substation, "ehv_substation.sql"), - postgres_conn_id="egon_data", - autocommit=True, - ) - - osm_add_metadata >> substation_tables >> substation_functions - substation_functions >> hvmv_substation_extraction - substation_functions >> ehv_substation_extraction - vg250_clean_and_prepare >> hvmv_substation_extraction - vg250_clean_and_prepare >> ehv_substation_extraction - # osmTGmod ehv/hv grid model generation osmtgmod = Osmtgmod( dependencies=[ osm_download, - ehv_substation_extraction, - hvmv_substation_extraction, + substation_extraction, setup_etrago, ] ) @@ -228,16 +210,17 @@ osmtgmod_pypsa = tasks["osmtgmod.to-pypsa"] osmtgmod_substation = tasks["osmtgmod_substation"] - # create Voronoi for MV grid districts - create_voronoi_substation = PythonOperator( - task_id="create-voronoi-substations", - python_callable=substation.create_voronoi, + # create Voronoi polygons + substation_voronoi = SubstationVoronoi( + dependencies=[ + osmtgmod_substation, + vg250, + ] ) - osmtgmod_substation >> create_voronoi_substation # MV grid districts mv_grid_districts = mv_grid_districts_setup( - dependencies=[create_voronoi_substation] + dependencies=[substation_voronoi] ) mv_grid_districts.insert_into(pipeline) define_mv_grid_districts = tasks[ @@ -260,35 +243,28 @@ heat_demands_abroad_download = tasks["heat_demand_europe.download"] # Gas grid import - gas_grid_insert_data = PythonOperator( - task_id="insert-gas-grid", python_callable=gas_grid.insert_gas_data + gas_grid_insert_data = GasNodesandPipes( + dependencies=[etrago_input_data, download_data_bundle, osmtgmod_pypsa] ) - etrago_input_data >> gas_grid_insert_data - download_data_bundle >> gas_grid_insert_data - osmtgmod_pypsa >> gas_grid_insert_data - # Power-to-gas installations creation - insert_power_to_h2_installations = PythonOperator( - task_id="insert-power-to-h2-installations", - python_callable=power_to_h2.insert_power_to_h2, + insert_power_to_h2_installations = PowertoH2( + dependencies=[gas_grid_insert_data] ) - gas_grid_insert_data >> insert_power_to_h2_installations - # Create gas voronoi - create_gas_polygons = PythonOperator( - task_id="create-gas-voronoi", python_callable=gas_areas.create_voronoi + create_gas_polygons = GasAreas( + dependencies=[gas_grid_insert_data, vg250_clean_and_prepare] ) - gas_grid_insert_data >> create_gas_polygons - vg250_clean_and_prepare >> create_gas_polygons - # Gas prod import gas_production_insert_data = CH4Production( dependencies=[create_gas_polygons] ) + # CH4 storages import + insert_data_ch4_storages = CH4Storages(dependencies=[create_gas_polygons]) + # Insert industrial gas demand industrial_gas_demand = IndustrialGasDemand( dependencies=[create_gas_polygons] @@ -332,12 +308,13 @@ zensus_misc_import >> import_district_heating_areas # Calculate dynamic line rating for HV trans lines - calculate_dlr = PythonOperator( - task_id="calculate_dlr", python_callable=dlr.Calculate_DLR + dlr = Calculate_dlr( + dependencies=[ + osmtgmod_pypsa, + download_data_bundle, + download_weather_data, + ] ) - osmtgmod_pypsa >> calculate_dlr - download_data_bundle >> calculate_dlr - download_weather_data >> calculate_dlr # Map zensus grid districts zensus_mv_grid_districts = ZensusMvGridDistricts( @@ -439,13 +416,19 @@ ) # CHP locations - chp = Chp(dependencies=[mv_grid_districts, mastr_data, industrial_sites]) + chp = Chp( + dependencies=[ + mv_grid_districts, + mastr_data, + industrial_sites, + create_gas_polygons, + ] + ) chp_locations_nep = tasks["chp.insert-chp-egon2035"] chp_heat_bus = tasks["chp.assign-heat-bus"] nep_insert_data >> chp_locations_nep - create_gas_polygons >> chp_locations_nep import_district_heating_areas >> chp_locations_nep # Power plants @@ -453,6 +436,7 @@ dependencies=[ setup, renewable_feedin, + substation_extraction, mv_grid_districts, mastr_data, re_potential_areas, @@ -470,14 +454,16 @@ "power_plants.pv_rooftop.pv-rooftop-per-mv-grid" ] - hvmv_substation_extraction >> generate_wind_farms - hvmv_substation_extraction >> generate_pv_ground_mounted feedin_pv >> solar_rooftop_etrago elec_cts_demands_zensus >> solar_rooftop_etrago elec_household_demands_zensus >> solar_rooftop_etrago etrago_input_data >> solar_rooftop_etrago map_zensus_grid_districts >> solar_rooftop_etrago + # Fill eTraGo Generators tables + fill_etrago_generators = Egon_etrago_gen( + dependencies=[power_plants, weather_data]) + # Heat supply heat_supply = HeatSupply( dependencies=[ @@ -491,7 +477,7 @@ # Heat to eTraGo heat_etrago = HeatEtrago( - dependencies=[heat_supply, mv_grid_districts, setup_etrago] + dependencies=[heat_supply, mv_grid_districts, setup_etrago, renewable_feedin] ) heat_etrago_buses = tasks["heat_etrago.buses"] @@ -509,6 +495,20 @@ ] ) + # Pumped hydro units + + pumped_hydro = PumpedHydro( + dependencies=[ + setup, + mv_grid_districts, + mastr_data, + scenario_parameters, + scenario_capacities, + Vg250MvGridDistricts, + power_plants, + ] + ) + # Heat time Series heat_time_series = HeatTimeSeries( dependencies=[ @@ -521,3 +521,13 @@ map_zensus_grid_districts, ] ) + + # HTS to etrago table + hts_etrago_table = HtsEtragoTable( + dependencies=[ + heat_time_series, + mv_grid_districts, + district_heating_areas, + heat_etrago, + ] + ) diff --git a/src/egon/data/config.py b/src/egon/data/config.py index 04112536d..cd9203454 100644 --- a/src/egon/data/config.py +++ b/src/egon/data/config.py @@ -53,13 +53,28 @@ def settings() -> dict[str, dict[str, str]]: """ files = paths(pid="*") + paths() if not files[0].exists(): - # TODO: Fatal errors should be raised as exceptions, so one can figure - # out where they are coming from without having to debug. - logger.error( - f"Unable to determine settings.\nConfiguration file:" - f"\n\n{files[0]}\n\nnot found.\nExiting." + logger.warning( + f"Configuration file:" + f"\n\n{files[0]}\n\nnot found.\nUsing defaults." ) - sys.exit(-1) + return { + "egon-data": { + "--airflow-database-name": "airflow", + "--airflow-port": 8080, + "--compose-project-name": "egon-data", + "--database-host": "127.0.0.1", + "--database-name": "egon-data", + "--database-password": "data", + "--database-port": "59734", + "--database-user": "egon", + "--dataset-boundary": "Everything", + "--docker-container-name": + "egon-data-local-database-container", + "--jobs": 1, + "--random-seed": 42, + "--processes-per-task": 1, + } + } with open(files[0]) as f: return yaml.safe_load(f) diff --git a/src/egon/data/datasets.yml b/src/egon/data/datasets.yml index 673a23a66..35ff2bd19 100644 --- a/src/egon/data/datasets.yml +++ b/src/egon/data/datasets.yml @@ -172,43 +172,46 @@ electrical_demands_cts: schema: 'demand' table: 'egon_demandregio_zensus_electricity' -hvmv_substation: - original_data: - source: - url: - target: - path: - processed: - schema: "grid" - table: "egon_hvmv_substation" -ehv_substation: - original_data: - source: - url: - target: - path: - processed: - schema: "grid" - table: "egon_ehv_substation" -hvmv_substation_voronoi: - original_data: - source: - url: - target: - path: - processed: - schema: "grid" - table: "egon_hvmv_substation_voronoi" -ehv_substation_voronoi: - original_data: - source: - url: - target: - path: - processed: - schema: "grid" - table: "egon_ehv_substation_voronoi" +substation_extraction: + sources: + osm_ways: + schema: 'openstreetmap' + table: 'osm_ways' + osm_nodes: + schema: 'openstreetmap' + table: 'osm_nodes' + osm_points: + schema: 'openstreetmap' + table: 'osm_point' + osm_lines: + schema: 'openstreetmap' + table: 'osm_line' + targets: + hvmv_substation: + schema: 'grid' + table: 'egon_hvmv_substation' + ehv_substation: + schema: 'grid' + table: 'egon_ehv_substation' +substation_voronoi: + sources: + hvmv_substation: + schema: 'grid' + table: 'egon_hvmv_substation' + ehv_substation: + schema: 'grid' + table: 'egon_ehv_substation' + boundaries: + schema: 'boundaries' + table: 'vg250_sta_union' + targets: + hvmv_substation_voronoi: + schema: 'grid' + table: 'egon_hvmv_substation_voronoi' + ehv_substation_voronoi: + schema: 'grid' + table: 'egon_ehv_substation_voronoi' society_prognosis: soucres: @@ -319,6 +322,7 @@ power_plants: mastr_hydro: "bnetza_mastr_hydro_cleaned.csv" mastr_location: "location_elec_generation_raw.csv" mastr_combustion_without_chp: "supply.egon_mastr_conventional_without_chp" + mastr_storage: "bnetza_mastr_storage_cleaned.csv" capacities: "supply.egon_scenario_capacities" geom_germany: "boundaries.vg250_sta_union" geom_federal_states: "boundaries.vg250_lan" @@ -332,6 +336,19 @@ power_plants: table: 'egon_power_plants' schema: 'supply' +storages: + sources: + mastr_storage: "bnetza_mastr_storage_cleaned.csv" + capacities: "supply.egon_scenario_capacities" + geom_germany: "boundaries.vg250_sta_union" + geom_federal_states: "boundaries.vg250_lan" + egon_mv_grid_district: "grid.egon_mv_grid_district" + ehv_voronoi: "grid.egon_ehv_substation_voronoi" + nep_conv: "supply.egon_nep_2021_conventional_powerplants" + target: + table: 'egon_storages' + schema: 'supply' + landuse: sources: osm_polygons: @@ -470,7 +487,7 @@ solar_rooftop: data-bundle: sources: zenodo: - deposit_id: 5645351 + deposit_id: 5743452 targets: file: 'data_bundle_egon_data.zip' @@ -538,7 +555,7 @@ etrago_heat: weather_cells: schema: 'supply' table: 'egon_era5_weather_cells' - solar_thermal_feedin: + feedin_timeseries: schema: 'supply' table: 'egon_era5_renewable_feedin' egon_mv_grid_district: @@ -563,6 +580,9 @@ etrago_heat: heat_links: schema: 'grid' table: 'egon_etrago_link' + heat_link_timeseries: + schema: 'grid' + table: 'egon_etrago_link_timeseries' industrial_sites: sources: @@ -720,7 +740,7 @@ chp_etrago: link: schema: 'grid' table: 'egon_etrago_link' - + DSM_CTS_industry: sources: cts_loadcurves: @@ -747,16 +767,71 @@ DSM_CTS_industry: table: 'egon_etrago_bus' link: schema: 'grid' - table: 'egon_etrago_link' + table: 'egon_etrago_link' link_timeseries: schema: 'grid' - table: 'egon_etrago_link_timeseries' + table: 'egon_etrago_link_timeseries' store: schema: 'grid' - table: 'egon_etrago_store' + table: 'egon_etrago_store' store_timeseries: schema: 'grid' - table: 'egon_etrago_store_timeseries' + table: 'egon_etrago_store_timeseries' + +generators_etrago: + sources: + power_plants: + schema: 'supply' + table: 'egon_power_plants' + renewable_feedin: + schema: 'supply' + table: 'egon_era5_renewable_feedin' + weather_cells: + schema: 'supply' + table: 'egon_era5_weather_cells' + egon_mv_grid_district: 'grid.egon_mv_grid_district' + ehv_voronoi: 'grid.egon_ehv_substation_voronoi' + targets: + etrago_generators: + schema: 'grid' + table: 'egon_etrago_generator' + etrago_gen_time: + schema: 'grid' + table: 'egon_etrago_generator_timeseries' + +weather_BusID: + sources: + power_plants: + schema: 'supply' + table: 'egon_power_plants' + renewable_feedin: + schema: 'supply' + table: 'egon_era5_renewable_feedin' + weather_cells: + schema: 'supply' + table: 'egon_era5_weather_cells' + boundaries: + schema: 'boundaries' + table: 'vg250_sta' + egon_mv_grid_district: 'grid.egon_mv_grid_district' + ehv_voronoi: 'grid.egon_ehv_substation_voronoi' + targets: + power_plants: + schema: 'supply' + table: 'egon_power_plants' + +dlr: + sources: + trans_lines: + schema: "grid" + table: "egon_etrago_line" + line_timeseries: + schema: "grid" + table: "egon_etrago_line_timeseries" + targets: + line_timeseries: + schema: "grid" + table: "egon_etrago_line_timeseries" electrical_neighbours: sources: diff --git a/src/egon/data/processing/calculate_dlr.py b/src/egon/data/datasets/calculate_dlr.py similarity index 93% rename from src/egon/data/processing/calculate_dlr.py rename to src/egon/data/datasets/calculate_dlr.py index ba57cd4f2..363e4a840 100644 --- a/src/egon/data/processing/calculate_dlr.py +++ b/src/egon/data/datasets/calculate_dlr.py @@ -7,18 +7,30 @@ """ from pathlib import Path -import numpy as np -import psycopg2 - +from shapely.geometry import Point import geopandas as gpd +import numpy as np import pandas as pd +import psycopg2 import rioxarray import xarray as xr + from egon.data import db -from shapely.geometry import Point +from egon.data.datasets import Dataset +import egon.data.config + + +class Calculate_dlr(Dataset): + def __init__(self, dependencies): + super().__init__( + name="dlr", + version="0.0.0", + dependencies=dependencies, + tasks=(dlr,), + ) -def Calculate_DLR(): +def dlr(): """Calculate DLR and assign values to each line in the db Parameters @@ -26,7 +38,7 @@ def Calculate_DLR(): *No parameters required """ - + cfg = egon.data.config.datasets()["dlr"] weather_info_path = Path(".") / "cutouts" / "germany-2011-era5.nc" regions_shape_path = ( @@ -47,7 +59,11 @@ def Calculate_DLR(): # Connect to the data base con = db.engine() - sql = "SELECT scn_name, line_id, geom, s_nom FROM grid.egon_etrago_line" + sql = f""" + SELECT scn_name, line_id, geom, s_nom FROM + {cfg['sources']['trans_lines']['schema']}. + {cfg['sources']['trans_lines']['table']} + """ df = gpd.GeoDataFrame.from_postgis(sql, con, crs="EPSG:4326") trans_lines_R = {} @@ -109,15 +125,16 @@ def Calculate_DLR(): # Delete existing data db.execute_sql( - """ - DELETE FROM grid.egon_etrago_line_timeseries; + f""" + DELETE FROM {cfg['sources']['line_timeseries']['schema']}. + {cfg['sources']['line_timeseries']['table']}; """ ) # Insert into database trans_lines.to_sql( - "egon_etrago_line_timeseries", - schema="grid", + f"{cfg['targets']['line_timeseries']['table']}", + schema=f"{cfg['targets']['line_timeseries']['schema']}", con=db.engine(), if_exists="append", index=False, @@ -308,4 +325,4 @@ def DLR_Regions(weather_info_path, regions_shape_path): for i in range(len(regions)): dlr_hourly["Reg_" + str(i + 1)] = dlr.iloc[:, 3 * i + 2] - return DLR_hourly_df_dic, dlr_hourly + return DLR_hourly_df_dic, dlr_hourly \ No newline at end of file diff --git a/src/egon/data/datasets/ch4_storages.py b/src/egon/data/datasets/ch4_storages.py new file mode 100755 index 000000000..72016940c --- /dev/null +++ b/src/egon/data/datasets/ch4_storages.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +""" +The central module containing all code dealing with importing gas storages data +""" +import ast +import pandas as pd +import numpy as np +import geopandas + +from egon.data.datasets.gas_prod import assign_ch4_bus_id +from egon.data.datasets.gas_grid import ch4_nodes_number_G, define_gas_nodes_list +from egon.data import db +from egon.data.config import settings +from egon.data.datasets import Dataset +from pathlib import Path + +class CH4Storages(Dataset): + def __init__(self, dependencies): + super().__init__( + name="CH4Storages", + version="0.0.0", + dependencies=dependencies, + tasks=(import_ch4_storages), + ) + +def import_installed_ch4_storages(): + """Define dataframe containing the ch4 storage units in Germany from the SciGRID_gas data + + Returns + ------- + Gas_storages_list : + Dataframe containing the gas storages units in Germany + + """ + target_file = ( + Path(".") / + "datasets" / + "gas_data" / + "data" / + "IGGIELGN_Storages.csv") + + Gas_storages_list = pd.read_csv(target_file, + delimiter=';', decimal='.', + usecols = ['lat', 'long', 'country_code','param']) + + Gas_storages_list = Gas_storages_list[ Gas_storages_list['country_code'].str.match('DE')] + + # Define new columns + e_nom = [] + NUTS1 = [] + end_year = [] + for index, row in Gas_storages_list.iterrows(): + param = ast.literal_eval(row['param']) + NUTS1.append(param['nuts_id_1']) + end_year.append(param['end_year']) + e_nom.append(param['max_power_MW']) + + Gas_storages_list = Gas_storages_list.assign(e_nom = e_nom) + Gas_storages_list = Gas_storages_list.assign(NUTS1 = NUTS1) + + end_year = [float('inf') if x == None else x for x in end_year] + Gas_storages_list = Gas_storages_list.assign(end_year = end_year) + + # Cut data to federal state if in testmode + boundary = settings()['egon-data']['--dataset-boundary'] + if boundary != 'Everything': + map_states = {'Baden-Württemberg':'DE1', 'Nordrhein-Westfalen': 'DEA', + 'Hessen': 'DE7', 'Brandenburg': 'DE4', 'Bremen':'DE5', + 'Rheinland-Pfalz': 'DEB', 'Sachsen-Anhalt': 'DEE', + 'Schleswig-Holstein':'DEF', 'Mecklenburg-Vorpommern': 'DE8', + 'Thüringen': 'DEG', 'Niedersachsen': 'DE9', + 'Sachsen': 'DED', 'Hamburg': 'DE6', 'Saarland': 'DEC', + 'Berlin': 'DE3', 'Bayern': 'DE2'} + + Gas_storages_list = Gas_storages_list[Gas_storages_list['NUTS1'].isin([map_states[boundary], np.nan])] + + # Remove unused storage units + Gas_storages_list = Gas_storages_list[Gas_storages_list['end_year'] >= 2035] + + Gas_storages_list = Gas_storages_list.rename(columns={'lat': 'y','long': 'x'}) + Gas_storages_list = geopandas.GeoDataFrame(Gas_storages_list, + geometry=geopandas.points_from_xy(Gas_storages_list['x'], + Gas_storages_list['y'])) + Gas_storages_list = Gas_storages_list.rename(columns={'geometry': 'geom'}).set_geometry('geom', crs=4326) + + # Match to associated gas bus + Gas_storages_list = Gas_storages_list.reset_index(drop=True) + Gas_storages_list = assign_ch4_bus_id(Gas_storages_list) + + # Add missing columns + c = {'scn_name':'eGon2035', 'carrier':'CH4'} + Gas_storages_list = Gas_storages_list.assign(**c) + + # Remove useless columns + Gas_storages_list = Gas_storages_list.drop(columns=['x', 'y', + 'param', 'country_code', + 'NUTS1', 'end_year', + 'geom', 'bus_id']) + return Gas_storages_list + + +def import_ch4_grid_capacity(): + """Define dataframe containing the modelling of the CH4 grid storage + capacity. The whole storage capacity of the grid (130000 MWh, estimation of + the Bundesnetzagentur) is split uniformly between all the german CH4 nodes + of the grid. The capacities of the pipes are not considerated. + + Returns + ------- + Gas_storages_list : + Dataframe containing the gas stores in Germany modelling the gas grid storage capacity + + """ + Gas_grid_capacity = 130000 # Storage capacity of the CH4 grid - G.Volk "Die Herauforderung an die Bundesnetzagentur die Energiewende zu meistern" Berlin, Dec 2012 + N_ch4_nodes_G = ch4_nodes_number_G(define_gas_nodes_list()) # Number of nodes in Germany + Store_capacity = Gas_grid_capacity / N_ch4_nodes_G # Storage capacity associated to each CH4 node of the german grid + + sql_gas = """SELECT bus_id, scn_name, carrier, geom + FROM grid.egon_etrago_bus + WHERE carrier = 'CH4';""" + Gas_storages_list = db.select_geodataframe(sql_gas, epsg=4326) + + # Add missing column + Gas_storages_list['e_nom'] = Store_capacity + Gas_storages_list['bus'] = Gas_storages_list['bus_id'] + + # Remove useless columns + Gas_storages_list = Gas_storages_list.drop(columns=['bus_id', 'geom']) + + return Gas_storages_list + + +def import_ch4_storages(): + """Insert list of gas storages units in database + + Returns + ------- + None. + """ + # Connect to local database + engine = db.engine() + + # Clean table + db.execute_sql( + """ + DELETE FROM grid.egon_etrago_store WHERE "carrier" = 'CH4'; + """ + ) + + # Select next id value + new_id = db.next_etrago_id('store') + + gas_storages_list = pd.concat([import_installed_ch4_storages(), import_ch4_grid_capacity()]) + gas_storages_list['store_id'] = range(new_id, new_id + len(gas_storages_list)) + + gas_storages_list = gas_storages_list.reset_index(drop=True) + + # Insert data to db + gas_storages_list.to_sql('egon_etrago_store', + engine, + schema ='grid', + index = False, + if_exists = 'append') \ No newline at end of file diff --git a/src/egon/data/datasets/demandregio/__init__.py b/src/egon/data/datasets/demandregio/__init__.py index 53a9e4871..aefef7909 100644 --- a/src/egon/data/datasets/demandregio/__init__.py +++ b/src/egon/data/datasets/demandregio/__init__.py @@ -33,14 +33,14 @@ class DemandRegio(Dataset): def __init__(self, dependencies): super().__init__( name="DemandRegio", - version="0.0.1", + version="0.0.2", dependencies=dependencies, tasks=( clone_and_install, create_tables, + insert_household_demand, { insert_society_data, - insert_household_demand, insert_cts_ind_demands, }, ), diff --git a/src/egon/data/datasets/district_heating_areas.py b/src/egon/data/datasets/district_heating_areas/__init__.py similarity index 99% rename from src/egon/data/datasets/district_heating_areas.py rename to src/egon/data/datasets/district_heating_areas/__init__.py index a493f3236..339152765 100644 --- a/src/egon/data/datasets/district_heating_areas.py +++ b/src/egon/data/datasets/district_heating_areas/__init__.py @@ -23,7 +23,7 @@ from shapely.geometry.multipolygon import MultiPolygon from shapely.geometry.polygon import Polygon from matplotlib import pyplot as plt -from egon.data.processing.district_heating_areas.plot import ( +from egon.data.datasets.district_heating_areas.plot import ( plot_heat_density_sorted, ) @@ -45,7 +45,7 @@ def __init__(self, dependencies): super().__init__( name="district-heating-areas", # version=self.target_files + "_0.0", - version="0.0.0", # maybe rethink the naming + version="0.0.1", # maybe rethink the naming dependencies=dependencies, tasks=(create_tables, demarcation), ) diff --git a/src/egon/data/processing/district_heating_areas/plot.py b/src/egon/data/datasets/district_heating_areas/plot.py similarity index 100% rename from src/egon/data/processing/district_heating_areas/plot.py rename to src/egon/data/datasets/district_heating_areas/plot.py diff --git a/src/egon/data/datasets/electricity_demand_etrago.py b/src/egon/data/datasets/electricity_demand_etrago.py index f100091c4..3a03c42fa 100644 --- a/src/egon/data/datasets/electricity_demand_etrago.py +++ b/src/egon/data/datasets/electricity_demand_etrago.py @@ -103,6 +103,7 @@ def export_to_db(): DELETE FROM {targets['etrago_load']['schema']}.{targets['etrago_load']['table']} WHERE scn_name = '{scenario}' + AND carrier = 'AC' """ ) @@ -111,6 +112,11 @@ def export_to_db(): DELETE FROM {targets['etrago_load_curves']['schema']}.{targets['etrago_load_curves']['table']} WHERE scn_name = '{scenario}' + AND load_id NOT IN ( + SELECT load_id FROM + {targets['etrago_load']['schema']}. + {targets['etrago_load']['table']} + WHERE scn_name = '{scenario}') """ ) @@ -181,7 +187,7 @@ class ElectricalLoadEtrago(Dataset): def __init__(self, dependencies): super().__init__( name="Electrical_load_etrago", - version="0.0.2", + version="0.0.3", dependencies=dependencies, tasks=(export_to_db,), ) diff --git a/src/egon/data/datasets/etrago_setup.py b/src/egon/data/datasets/etrago_setup.py index 0069f79d4..c1900a56a 100644 --- a/src/egon/data/datasets/etrago_setup.py +++ b/src/egon/data/datasets/etrago_setup.py @@ -1,4 +1,6 @@ # coding: utf-8 +from geoalchemy2.types import Geometry +from shapely.geometry import LineString from sqlalchemy import ( ARRAY, BigInteger, @@ -12,14 +14,12 @@ Text, text, ) -from geoalchemy2.types import Geometry from sqlalchemy.ext.declarative import declarative_base +import geopandas as gpd + from egon.data import db from egon.data.datasets import Dataset -import geopandas as gpd -from shapely.geometry import LineString - Base = declarative_base() metadata = Base.metadata @@ -362,6 +362,17 @@ class EgonPfHvTransformerTimeseries(Base): trafo_id = Column(BigInteger, primary_key=True, nullable=False) temp_id = Column(Integer, primary_key=True, nullable=False) s_max_pu = Column(ARRAY(Float(precision=53))) + + +class EgonPfHvBusmap(Base): + __tablename__ = "egon_etrago_hv_busmap" + __table_args__ = {"schema": "grid"} + + scn_name = Column(Text, primary_key=True, nullable=False) + bus0 = Column(Text, primary_key=True, nullable=False) + bus1 = Column(Text, primary_key=True, nullable=False) + path_length = Column(Numeric) + version = Column(Text, primary_key=True, nullable=False) def create_tables(): @@ -467,6 +478,7 @@ def create_tables(): EgonPfHvTempResolution.__table__.drop(bind=engine, checkfirst=True) EgonPfHvTransformer.__table__.drop(bind=engine, checkfirst=True) EgonPfHvTransformerTimeseries.__table__.drop(bind=engine, checkfirst=True) + EgonPfHvBusmap.__table__.drop(bind=engine, checkfirst=True) # Create new tables EgonPfHvBus.__table__.create(bind=engine, checkfirst=True) EgonPfHvBusTimeseries.__table__.create(bind=engine, checkfirst=True) @@ -488,6 +500,7 @@ def create_tables(): EgonPfHvTransformerTimeseries.__table__.create( bind=engine, checkfirst=True ) + EgonPfHvBusmap.__table__.create(bind=engine, checkfirst=True) def temp_resolution(): diff --git a/src/egon/data/datasets/fill_etrago_gen.py b/src/egon/data/datasets/fill_etrago_gen.py new file mode 100644 index 000000000..21f458b2f --- /dev/null +++ b/src/egon/data/datasets/fill_etrago_gen.py @@ -0,0 +1,281 @@ +import geopandas as gpd +import numpy as np +import pandas as pd + +from egon.data import db +from egon.data.datasets import Dataset +import egon.data.config + +class Egon_etrago_gen(Dataset): + def __init__(self, dependencies): + super().__init__( + name="etrago_generators", + version="0.0.2", + dependencies=dependencies, + tasks=(fill_etrago_generators,), + ) + + +def fill_etrago_generators(): + # Connect to the data base + con = db.engine() + cfg = egon.data.config.datasets()["generators_etrago"] + + # Delete power plants from previous iterations of this script + delete_previuos_gen(cfg) + + # Load required tables + ( + power_plants, + renew_feedin, + weather_cells, + etrago_gen_orig, + pp_time, + ) = load_tables(con, cfg) + + power_plants = adjust_power_plants_table( + power_plants=power_plants, renew_feedin=renew_feedin, cfg=cfg + ) + + etrago_pp = group_power_plants( + power_plants=power_plants, + renew_feedin=renew_feedin, + etrago_gen_orig=etrago_gen_orig, + cfg=cfg, + ) + + etrago_gen_table = fill_etrago_gen_table( + etrago_pp2=etrago_pp, etrago_gen_orig=etrago_gen_orig, cfg=cfg, con=con + ) + + etrago_gen_time_table = fill_etrago_gen_time_table( + etrago_pp=etrago_pp, + power_plants=power_plants, + renew_feedin=renew_feedin, + pp_time=pp_time, + cfg=cfg, + con=con, + ) + + return "eTrago Generators tables filled successfully" + + +def group_power_plants(power_plants, renew_feedin, etrago_gen_orig, cfg): + + # group power plants by bus and carrier + + agg_func = { + "sources": numpy_nan, + "source_id": numpy_nan, + "carrier": consistency, + "el_capacity": np.sum, + "bus_id": consistency, + "voltage_level": numpy_nan, + "weather_cell_id": power_timeser, + "scenario": consistency, + "geom": numpy_nan, + } + + etrago_pp = power_plants.groupby(by=["bus_id", "carrier"]).agg( + func=agg_func + ) + etrago_pp = etrago_pp.reset_index(drop=True) + + if np.isnan(etrago_gen_orig["generator_id"].max()): + max_id = 0 + else: + max_id = etrago_gen_orig["generator_id"].max() + 1 + etrago_pp["generator_id"] = list(range(max_id, max_id + len(etrago_pp))) + etrago_pp.set_index("generator_id", inplace=True) + + return etrago_pp + + +def fill_etrago_gen_table(etrago_pp2, etrago_gen_orig, cfg, con): + + etrago_pp = etrago_pp2[["carrier", "el_capacity", "bus_id", "scenario"]] + etrago_pp = etrago_pp.rename( + columns={ + "el_capacity": "p_nom", + "bus_id": "bus", + "scenario": "scn_name", + } + ) + + etrago_pp = etrago_pp.reindex(columns=etrago_gen_orig.columns) + etrago_pp = etrago_pp.drop(columns="generator_id") + etrago_pp.to_sql( + name=f"{cfg['targets']['etrago_generators']['table']}", + schema=f"{cfg['targets']['etrago_generators']['schema']}", + con=con, + if_exists="append", + ) + return etrago_pp + + +def fill_etrago_gen_time_table( + etrago_pp, power_plants, renew_feedin, pp_time, cfg, con +): + etrago_pp_time = etrago_pp.copy() + etrago_pp_time = etrago_pp_time[ + ["carrier", "el_capacity", "bus_id", "weather_cell_id", "scenario"] + ] + + etrago_pp_time = etrago_pp_time[ + (etrago_pp_time["carrier"] == "pv") + | (etrago_pp_time["carrier"] == "wind_onshore") + | (etrago_pp_time["carrier"] == "wind_offshore") + ] + + cal_timeseries = set_timeseries( + power_plants=power_plants, renew_feedin=renew_feedin + ) + + etrago_pp_time["p_max_pu"] = 0 + etrago_pp_time["p_max_pu"] = etrago_pp_time.apply(cal_timeseries, axis=1) + + etrago_pp_time.rename(columns={"scenario": "scn_name"}, inplace=True) + etrago_pp_time = etrago_pp_time[["scn_name", "p_max_pu"]] + etrago_pp_time = etrago_pp_time.reindex(columns=pp_time.columns) + etrago_pp_time = etrago_pp_time.drop(columns="generator_id") + etrago_pp_time["p_max_pu"] = etrago_pp_time["p_max_pu"].apply(list) + etrago_pp_time["temp_id"] = 1 + + db.execute_sql( + f"""DELETE FROM + {cfg['targets']['etrago_gen_time']['schema']}. + {cfg['targets']['etrago_gen_time']['table']} + """ + ) + + etrago_pp_time.to_sql( + name=f"{cfg['targets']['etrago_gen_time']['table']}", + schema=f"{cfg['targets']['etrago_gen_time']['schema']}", + con=con, + if_exists="append", + ) + return etrago_pp + + +def load_tables(con, cfg): + sql = f""" + SELECT * FROM + {cfg['sources']['power_plants']['schema']}. + {cfg['sources']['power_plants']['table']} + """ + power_plants = gpd.GeoDataFrame.from_postgis( + sql, con, crs="EPSG:4326", index_col="id" + ) + + sql = f""" + SELECT * FROM + {cfg['sources']['renewable_feedin']['schema']}. + {cfg['sources']['renewable_feedin']['table']} + """ + renew_feedin = pd.read_sql(sql, con) + + sql = f""" + SELECT * FROM + {cfg['sources']['weather_cells']['schema']}. + {cfg['sources']['weather_cells']['table']} + """ + weather_cells = gpd.GeoDataFrame.from_postgis(sql, con, crs="EPSG:4326") + + sql = f""" + SELECT * FROM + {cfg['targets']['etrago_generators']['schema']}. + {cfg['targets']['etrago_generators']['table']} + """ + etrago_gen_orig = pd.read_sql(sql, con) + + sql = f""" + SELECT * FROM + {cfg['targets']['etrago_gen_time']['schema']}. + {cfg['targets']['etrago_gen_time']['table']} + """ + pp_time = pd.read_sql(sql, con) + + return power_plants, renew_feedin, weather_cells, etrago_gen_orig, pp_time + + +def consistency(data): + assert ( + len(set(data)) <= 1 + ), f"The elements in column {data.name} do not match" + return data.iloc[0] + + +def numpy_nan(data): + return np.nan + + +def power_timeser(weather_data): + if len(set(weather_data)) <= 1: + return weather_data.iloc[0] + else: + return -1 + + +def adjust_power_plants_table(power_plants, renew_feedin, cfg): + # Define carrier 'solar' as 'pv' + carrier_pv_mask = power_plants["carrier"] == "solar" + power_plants.loc[carrier_pv_mask, "carrier"] = "pv" + + # convert renewable feedin lists to arrays + renew_feedin["feedin"] = renew_feedin["feedin"].apply(np.array) + + return power_plants + + +def delete_previuos_gen(cfg): + db.execute_sql( + f"""DELETE FROM + {cfg['targets']['etrago_generators']['schema']}. + {cfg['targets']['etrago_generators']['table']} + WHERE carrier <> 'CH4' AND carrier <> 'solar_rooftop' + AND carrier <> 'solar_thermal_collector' + AND carrier <> 'geo_thermal' + """ + ) + + +def set_timeseries(power_plants, renew_feedin): + def timeseries(pp): + try: + if pp.weather_cell_id != -1: + feedin_time = renew_feedin[ + (renew_feedin["w_id"] == pp.weather_cell_id) + & (renew_feedin["carrier"] == pp.carrier) + ].feedin.iloc[0] + return feedin_time + else: + df = power_plants[ + (power_plants["bus_id"] == pp.bus_id) + & (power_plants["carrier"] == pp.carrier) + ] + total_int_cap = df.el_capacity.sum() + df["feedin"] = 0 + df["feedin"] = df.apply( + lambda x: renew_feedin[ + (renew_feedin["w_id"] == x.weather_cell_id) + & (renew_feedin["carrier"] == x.carrier) + ].feedin.iloc[0], + axis=1, + ) + df["feedin"] = df.apply( + lambda x: x.el_capacity / total_int_cap * x.feedin, axis=1 + ) + return df.feedin.sum() + ####################################################################### + ####################### DELETE THIS EXCEPTION ######################### + except: + df = power_plants[ + (power_plants["bus_id"] == pp.bus_id) + & (power_plants["carrier"] == pp.carrier) + ] + return list(df.weather_cell_id) + + ####################### DELETE THIS EXCEPTION ############################# + ########################################################################### + return timeseries + diff --git a/src/egon/data/datasets/gas_grid.py b/src/egon/data/datasets/gas_grid.py index 83327c6df..2c9fe02c5 100755 --- a/src/egon/data/datasets/gas_grid.py +++ b/src/egon/data/datasets/gas_grid.py @@ -16,10 +16,18 @@ from egon.data import db from egon.data.config import settings from egon.data.datasets import Dataset -from geoalchemy2.shape import from_shape from geoalchemy2.types import Geometry from shapely import geometry +class GasNodesandPipes(Dataset): + def __init__(self, dependencies): + super().__init__( + name="GasNodesandPipes", + version="0.0.0", + dependencies=dependencies, + tasks=(insert_gas_data), + ) + def download_SciGRID_gas_data(): """ @@ -95,6 +103,26 @@ def define_gas_nodes_list(): return gas_nodes_list +def ch4_nodes_number_G(gas_nodes_list): + """Insert list of CH4 nodes from SciGRID_gas IGGIELGN data + Parameters + ---------- + gas_nodes_list : dataframe + Dataframe containing the gas nodes (Europe) + Returns + ------- + N_ch4_nodes_G : int + Number of CH4 buses in Germany (independantly from the mode used) + """ + + ch4_nodes_list = gas_nodes_list[ + gas_nodes_list["country_code"].str.match("DE") + ] # A remplacer evtmt par un test sur le NUTS0 ? + N_ch4_nodes_G = len(ch4_nodes_list) + + return N_ch4_nodes_G + + def insert_CH4_nodes_list(gas_nodes_list): """Insert list of CH4 nodes from SciGRID_gas IGGIELGN data Parameters @@ -103,7 +131,7 @@ def insert_CH4_nodes_list(gas_nodes_list): Dataframe containing the gas nodes (Europe) Returns ------- - None. + None """ # Connect to local database engine = db.engine() @@ -111,6 +139,7 @@ def insert_CH4_nodes_list(gas_nodes_list): gas_nodes_list = gas_nodes_list[ gas_nodes_list["country_code"].str.match("DE") ] # A remplacer evtmt par un test sur le NUTS0 ? + # Cut data to federal state if in testmode NUTS1 = [] for index, row in gas_nodes_list.iterrows(): diff --git a/src/egon/data/datasets/heat_demand_timeseries/HTS.py b/src/egon/data/datasets/heat_demand_timeseries/HTS.py index 6b0a04185..661bd3a3e 100644 --- a/src/egon/data/datasets/heat_demand_timeseries/HTS.py +++ b/src/egon/data/datasets/heat_demand_timeseries/HTS.py @@ -1,35 +1,35 @@ -import pandas as pd -import numpy as np -import psycopg2 -import pandas.io.sql as sqlio +from datetime import datetime +import glob +import os + +from sqlalchemy import ARRAY, Column, Float, ForeignKey, Integer, String +from sqlalchemy.ext.declarative import declarative_base import geopandas as gpd import matplotlib.pyplot as plt -import os -import glob +import numpy as np +import pandas as pd +import pandas.io.sql as sqlio +import psycopg2 +import xarray as xr from egon.data import db, subprocess - -import xarray as xr -from sqlalchemy import Column, String, Float, Integer, ForeignKey, ARRAY import egon.data.datasets.era5 as era -from sqlalchemy.ext.declarative import declarative_base - try: - from disaggregator import config, data, spatial, temporal, plot + from disaggregator import config, data, plot, spatial, temporal except ImportError as e: pass from math import ceil - from egon.data.datasets import Dataset +import egon class IdpProfiles: def __init__(self, df_index, **kwargs): index = pd.date_range( - pd.datetime(2011, 1, 1, 0), periods=8760, freq="H" + datetime(2011, 1, 1, 0), periods=8760, freq="H" ) self.df = pd.DataFrame(index=df_index) @@ -38,7 +38,7 @@ def __init__(self, df_index, **kwargs): def get_temperature_interval(self, how="geometric_series"): index = pd.date_range( - pd.datetime(2011, 1, 1, 0), periods=8760, freq="H" + datetime(2011, 1, 1, 0), periods=8760, freq="H" ) """Appoints the corresponding temperature interval to each temperature in the temperature vector. @@ -201,7 +201,7 @@ def temp_interval(): Hourly temperature intrerval of all 15 TRY Climate station#s temperature profile """ - index = pd.date_range(pd.datetime(2011, 1, 1, 0), periods=8760, freq="H") + index = pd.date_range(datetime(2011, 1, 1, 0), periods=8760, freq="H") temperature_interval = pd.DataFrame() temp_profile = temperature_profile_extract() @@ -234,7 +234,7 @@ def idp_pool_generator(): "household_heat_demand_profiles", "household_heat_demand_profiles.hdf5", ) - index = pd.date_range(pd.datetime(2011, 1, 1, 0), periods=8760, freq="H") + index = pd.date_range(datetime(2011, 1, 1, 0), periods=8760, freq="H") sfh = pd.read_hdf(path, key="SFH") mfh = pd.read_hdf(path, key="MFH") @@ -919,7 +919,7 @@ def h_value(): Extracted from demandlib. """ - index = pd.date_range(pd.datetime(2011, 1, 1, 0), periods=8760, freq="H") + index = pd.date_range(datetime(2011, 1, 1, 0), periods=8760, freq="H") a = 3.0469695 @@ -978,7 +978,7 @@ def profile_generator(aggregation_level): 0 heat_profile_idp : pandas.DataFrame if aggreation_level = 'district' - heat profiles for every mv grid subst_id + heat profiles for every mv grid bus_id else heat profiles for every zensus_poppulation_id """ @@ -1027,13 +1027,14 @@ def profile_generator(aggregation_level): heat_profile_idp = pd.merge( heat_profile_idp, - mv_grid_ind["subst_id"], + mv_grid_ind["bus_id"], left_on=selected_profiles.index, right_on=mv_grid_ind.index, how="inner", ) - heat_profile_idp.sort_values("subst_id", inplace=True) - heat_profile_idp.set_index("subst_id", inplace=True) + + heat_profile_idp.sort_values("bus_id", inplace=True) + heat_profile_idp.set_index("bus_id", inplace=True) heat_profile_idp.drop("key_0", axis=1, inplace=True) heat_profile_dist = heat_profile_dist.groupby( @@ -1082,7 +1083,7 @@ def residential_demand_scale(aggregation_level): 0 heat_demand_profile_mv : pandas.DataFrame if aggregation ='district' - final demand profiles per mv grid subst_id + final demand profiles per mv grid bus_id else 0 heat_demand_profile_zensus : pandas.DataFrame @@ -1176,26 +1177,25 @@ def residential_demand_scale(aggregation_level): mv_grid.index.difference(district_heating.index), : ] mv_grid_ind = mv_grid_ind.reset_index() - district_grid = pd.merge( - mv_grid_ind[["subst_id", "zensus_population_id"]], + mv_grid_ind[["bus_id", "zensus_population_id"]], annual_demand[["zensus_population_id", "Station", "demand"]], on="zensus_population_id", how="inner", ) - district_grid.sort_values("subst_id", inplace=True) + district_grid.sort_values("bus_id", inplace=True) district_grid.drop("zensus_population_id", axis=1, inplace=True) - district_grid = district_grid.groupby(["subst_id", "Station"]).sum() + district_grid = district_grid.groupby(["bus_id", "Station"]).sum() district_grid.reset_index("Station", inplace=True) demand_curves_mv = pd.DataFrame() for j in range(len(heat_profile_idp.columns)): current_district = heat_profile_idp.iloc[:, j] - subst_id = heat_profile_idp.columns[j] - station = district_grid[district_grid.index == subst_id][ + bus_id = heat_profile_idp.columns[j] + station = district_grid[district_grid.index == bus_id][ "Station" - ][subst_id] + ][bus_id] if type(station) != str: station = station.reset_index() multiple_stations = pd.DataFrame() @@ -1205,9 +1205,9 @@ def residential_demand_scale(aggregation_level): h[current_station], axis=0 ) multiple_stations = multiple_stations.sum(axis=1) - demand_curves_mv[subst_id] = multiple_stations + demand_curves_mv[bus_id] = multiple_stations else: - demand_curves_mv[subst_id] = current_district.multiply( + demand_curves_mv[bus_id] = current_district.multiply( h[station], axis=0 ) demand_curves_mv = demand_curves_mv.apply(lambda x: x / x.sum()) @@ -1225,9 +1225,10 @@ def residential_demand_scale(aggregation_level): ) heat_demand_profile_mv.rename( - columns={"key_0": "subst_id"}, inplace=True + + columns={"key_0": "bus_id"}, inplace=True ) - heat_demand_profile_mv.set_index("subst_id", inplace=True) + heat_demand_profile_mv.set_index("bus_id", inplace=True) heat_demand_profile_mv = heat_demand_profile_mv[ heat_demand_profile_mv.columns[:-1] ].multiply(heat_demand_profile_mv.demand, axis=0) @@ -1418,17 +1419,17 @@ def cts_demand_per_aggregation_level(aggregation_level): CTS_per_grid = pd.merge( CTS_per_zensus, - mv_grid_ind[["subst_id", "zensus_population_id"]], + mv_grid_ind[["bus_id", "zensus_population_id"]], on="zensus_population_id", how="inner", ) - CTS_per_grid.set_index("subst_id", inplace=True) + CTS_per_grid.set_index("bus_id", inplace=True) CTS_per_grid.drop("zensus_population_id", axis=1, inplace=True) CTS_per_grid = CTS_per_grid.groupby(lambda x: x, axis=0).sum() CTS_per_grid = CTS_per_grid.transpose() CTS_per_grid = CTS_per_grid.apply(lambda x: x / x.sum()) - CTS_per_grid.columns.name = "subst_id" + CTS_per_grid.columns.name = "bus_id" CTS_per_grid.reset_index(drop=True, inplace=True) CTS_per_zensus = pd.DataFrame() @@ -1544,13 +1545,13 @@ def CTS_demand_scale(aggregation_level): CTS_demands_grid = pd.merge( demand, - mv_grid_ind[["subst_id", "zensus_population_id"]], + mv_grid_ind[["bus_id", "zensus_population_id"]], on="zensus_population_id", how="inner", ) CTS_demands_grid.drop("zensus_population_id", axis=1, inplace=True) - CTS_demands_grid = CTS_demands_grid.groupby("subst_id").sum() + CTS_demands_grid = CTS_demands_grid.groupby("bus_id").sum() CTS_per_grid = pd.merge( CTS_per_grid, @@ -1560,8 +1561,9 @@ def CTS_demand_scale(aggregation_level): left_on=CTS_demands_grid.index, ) - CTS_per_grid = CTS_per_grid.rename(columns={"key_0": "subst_id"}) - CTS_per_grid.set_index("subst_id", inplace=True) + + CTS_per_grid = CTS_per_grid.rename(columns={"key_0": "bus_id"}) + CTS_per_grid.set_index("bus_id", inplace=True) CTS_per_grid = CTS_per_grid[CTS_per_grid.columns[:-1]].multiply( CTS_per_grid.demand, axis=0 @@ -1643,10 +1645,11 @@ def demand_profile_generator(aggregation_level="district"): [residential_demand_grid, CTS_demand_grid] ) total_demands_grid.sort_index(inplace=True) + total_demands_grid = total_demands_grid.groupby( lambda x: x, axis=0 ).sum() - total_demands_grid.index.name = "subst_id" + total_demands_grid.index.name = "bus_id" final_heat_profiles_grid = pd.DataFrame(index=total_demands_grid.index) final_heat_profiles_grid[ @@ -1660,6 +1663,7 @@ def demand_profile_generator(aggregation_level="district"): index=True, dtype=ARRAY(Float()), ) + else: total_demands_zensus = pd.concat( [residential_demand_zensus, CTS_demand_zensus] @@ -1691,7 +1695,7 @@ class HeatTimeSeries(Dataset): def __init__(self, dependencies): super().__init__( name="HeatTimeSeries", - version="0.0.0", + version="0.0.1", dependencies=dependencies, tasks=(demand_profile_generator), ) diff --git a/src/egon/data/datasets/heat_etrago/__init__.py b/src/egon/data/datasets/heat_etrago/__init__.py index 0945d7d24..b886a7dca 100644 --- a/src/egon/data/datasets/heat_etrago/__init__.py +++ b/src/egon/data/datasets/heat_etrago/__init__.py @@ -12,7 +12,7 @@ def insert_buses(carrier, scenario="eGon2035"): - """ Insert heat buses to etrago table + """Insert heat buses to etrago table Heat buses are divided into central and individual heating @@ -88,7 +88,7 @@ def insert_buses(carrier, scenario="eGon2035"): def insert_central_direct_heat(scenario="eGon2035"): - """ Insert renewable heating technologies (solar and geo thermal) + """Insert renewable heating technologies (solar and geo thermal) Parameters ---------- @@ -183,8 +183,8 @@ def insert_central_direct_heat(scenario="eGon2035"): feedin = db.select_dataframe( f""" SELECT w_id, feedin - FROM {sources['solar_thermal_feedin']['schema']}. - {sources['solar_thermal_feedin']['table']} + FROM {sources['feedin_timeseries']['schema']}. + {sources['feedin_timeseries']['table']} WHERE carrier = 'solar_thermal' AND weather_year = 2011 """, @@ -372,7 +372,7 @@ def insert_rural_gas_boilers(scenario="eGon2035", efficiency=0.98): def buses(): - """ Insert individual and district heat buses into eTraGo-tables + """Insert individual and district heat buses into eTraGo-tables Parameters ---------- @@ -388,7 +388,7 @@ def buses(): def supply(): - """ Insert individual and district heat supply into eTraGo-tables + """Insert individual and district heat supply into eTraGo-tables Parameters ---------- @@ -411,7 +411,7 @@ class HeatEtrago(Dataset): def __init__(self, dependencies): super().__init__( name="HeatEtrago", - version="0.0.4", + version="0.0.5", dependencies=dependencies, tasks=(buses, supply), ) diff --git a/src/egon/data/datasets/heat_etrago/hts_etrago.py b/src/egon/data/datasets/heat_etrago/hts_etrago.py new file mode 100644 index 000000000..f0f08ba67 --- /dev/null +++ b/src/egon/data/datasets/heat_etrago/hts_etrago.py @@ -0,0 +1,119 @@ +from egon.data import config, db +from egon.data.db import next_etrago_id +from egon.data.datasets import Dataset + +import pandas as pd +import numpy as np + +def hts_to_etrago(): + + sources = config.datasets()["etrago_heat"]["sources"] + targets = config.datasets()["etrago_heat"]["targets"] + scenario = 'eGon2035' + carriers = ['central_heat','rural_heat'] + + for carrier in carriers: + if carrier == 'central_heat': + + # Map heat buses to district heating id and area_id + # interlinking bus_id and area_id + bus_area = db.select_dataframe( + f""" + SELECT bus_id, area_id, id FROM + {targets['heat_buses']['schema']}. + {targets['heat_buses']['table']} + JOIN {sources['district_heating_areas']['schema']}. + {sources['district_heating_areas']['table']} + ON ST_Transform(ST_Centroid(geom_polygon), 4326) = geom + WHERE carrier = '{carrier}' + AND scenario='{scenario}' + """, + index_col="id", + ) + + #district heating time series time series + disct_time_series = db.select_dataframe( + """ + SELECT * FROM + demand.egon_timeseries_district_heating + """ + ) + #bus_id connected to corresponding time series + bus_ts = pd.merge(bus_area,disct_time_series, on='area_id', how = 'inner') + + else: + #interlinking heat_bus_id and mv_grid bus_id + bus_sub = db.select_dataframe( + f""" + SELECT {targets['heat_buses']['schema']}. + {targets['heat_buses']['table']}.bus_id as heat_bus_id, + {sources['egon_mv_grid_district']['schema']}. + {sources['egon_mv_grid_district']['table']}.bus_id as + bus_id FROM + {targets['heat_buses']['schema']}. + {targets['heat_buses']['table']} + JOIN {sources['egon_mv_grid_district']['schema']}. + {sources['egon_mv_grid_district']['table']} + ON ST_Transform(ST_Centroid({sources['egon_mv_grid_district']['schema']}. + {sources['egon_mv_grid_district']['table']}.geom), + 4326) = {targets['heat_buses']['schema']}. + {targets['heat_buses']['table']}.geom + WHERE carrier = '{carrier}' + """ + ) + ##**scenario name still needs to be adjusted in bus_sub** + + #individual heating time series + ind_time_series = db.select_dataframe( + """ + SELECT * FROM + demand.egon_etrago_timeseries_individual_heating + """ + ) + + # bus_id connected to corresponding time series + bus_ts = pd.merge(bus_sub,ind_time_series, on='bus_id', how = 'inner') + + next_id = next_etrago_id('load') + + bus_ts['load_id']=np.arange(len(bus_ts))+next_id + + etrago_load = pd.DataFrame(index=range(len(bus_ts))) + etrago_load['scn_name'] = scenario + etrago_load['load_id'] = bus_ts.load_id + etrago_load['bus'] =bus_ts.bus_id + etrago_load['carrier'] = carrier + etrago_load['sign']=-1 + + etrago_load.to_sql( + 'egon_etrago_load', + schema = 'grid', + con = db.engine(), + if_exists ='append', + index =False + ) + + etrago_load_timeseries = pd.DataFrame(index=range(len(bus_ts))) + etrago_load_timeseries['scn_name'] = scenario + etrago_load_timeseries['load_id'] = bus_ts.load_id + etrago_load_timeseries['temp_id'] = 1 + etrago_load_timeseries['p_set'] = bus_ts.iloc[:,2] + + etrago_load_timeseries.to_sql( + 'egon_etrago_load_timeseries', + schema = 'grid', + con = db.engine(), + if_exists ='append', + index =False + ) + + +class HtsEtragoTable(Dataset): + def __init__(self, dependencies): + super().__init__( + name="HtsEtragoTable", + version="0.0.0", + dependencies=dependencies, + tasks=(hts_to_etrago), + ) + diff --git a/src/egon/data/datasets/heat_etrago/power_to_heat.py b/src/egon/data/datasets/heat_etrago/power_to_heat.py index e14bb3cce..116c73bb2 100644 --- a/src/egon/data/datasets/heat_etrago/power_to_heat.py +++ b/src/egon/data/datasets/heat_etrago/power_to_heat.py @@ -4,7 +4,9 @@ import geopandas as gpd import pandas as pd + from egon.data import config, db +from egon.data.datasets.renewable_feedin import weather_cells_in_germany def insert_individual_power_to_heat(scenario="eGon2035"): @@ -25,6 +27,18 @@ def insert_individual_power_to_heat(scenario="eGon2035"): targets = config.datasets()["etrago_heat"]["targets"] # Delete existing entries + db.execute_sql( + f""" + DELETE FROM {targets['heat_link_timeseries']['schema']}. + {targets['heat_link_timeseries']['table']} + WHERE link_id IN ( + SELECT link_id FROM {targets['heat_links']['schema']}. + {targets['heat_links']['table']} + WHERE carrier = 'individual_heat_pump' + AND scn_name = '{scenario}') + AND scn_name = '{scenario}' + """ + ) db.execute_sql( f""" DELETE FROM {targets['heat_links']['schema']}. @@ -37,7 +51,7 @@ def insert_individual_power_to_heat(scenario="eGon2035"): heat_pumps = db.select_dataframe( f""" SELECT mv_grid_id as power_bus, - a.carrier, capacity, b.bus_id as heat_bus + a.carrier, capacity, b.bus_id as heat_bus, d.feedin as cop FROM {sources['individual_heating_supply']['schema']}. {sources['individual_heating_supply']['table']} a JOIN {targets['heat_buses']['schema']}. @@ -45,10 +59,18 @@ def insert_individual_power_to_heat(scenario="eGon2035"): ON ST_Intersects( ST_Buffer(ST_Transform(ST_Centroid(a.geometry), 4326), 0.00000001), geom) + JOIN {sources['weather_cells']['schema']}. + {sources['weather_cells']['table']} c + ON ST_Intersects( + b.geom, c.geom) + JOIN {sources['feedin_timeseries']['schema']}. + {sources['feedin_timeseries']['table']} d + ON c.w_id = d.w_id WHERE scenario = '{scenario}' AND scn_name = '{scenario}' AND a.carrier = 'heat_pump' AND b.carrier = 'rural_heat' + AND d.carrier = 'heat_pump_cop' """ ) @@ -82,6 +104,19 @@ def insert_central_power_to_heat(scenario="eGon2035"): targets = config.datasets()["etrago_heat"]["targets"] # Delete existing entries + db.execute_sql( + f""" + DELETE FROM {targets['heat_link_timeseries']['schema']}. + {targets['heat_link_timeseries']['table']} + WHERE link_id IN ( + SELECT link_id FROM {targets['heat_links']['schema']}. + {targets['heat_links']['table']} + WHERE carrier = 'central_heat_pump' + AND scn_name = '{scenario}') + AND scn_name = '{scenario}' + """ + ) + db.execute_sql( f""" DELETE FROM {targets['heat_links']['schema']}. @@ -89,13 +124,23 @@ def insert_central_power_to_heat(scenario="eGon2035"): WHERE carrier = 'central_heat_pump' """ ) + # Select heat pumps in district heating central_heat_pumps = db.select_geodataframe( f""" - SELECT * FROM {sources['district_heating_supply']['schema']}. - {sources['district_heating_supply']['table']} + SELECT a.index, a.district_heating_id, a.carrier, a.category, a.capacity, a.geometry, a.scenario, d.feedin as cop + FROM {sources['district_heating_supply']['schema']}. + {sources['district_heating_supply']['table']} a + JOIN {sources['weather_cells']['schema']}. + {sources['weather_cells']['table']} c + ON ST_Intersects( + ST_Transform(a.geometry, 4326), c.geom) + JOIN {sources['feedin_timeseries']['schema']}. + {sources['feedin_timeseries']['table']} d + ON c.w_id = d.w_id WHERE scenario = '{scenario}' - AND carrier = 'heat_pump' + AND a.carrier = 'heat_pump' + AND d.carrier = 'heat_pump_cop' """, geom_col="geometry", epsg=4326, @@ -251,6 +296,28 @@ def insert_power_to_heat_per_level( con=db.engine(), ) + if "cop" in gdf.columns: + + # Create dataframe for time-dependent data + links_timeseries = pd.DataFrame( + index=links.index, + data={ + "link_id": links.link_id, + "efficiency": gdf.cop, + "scn_name": scenario, + "temp_id": 1, + }, + ) + + # Insert time-dependent data to database + links_timeseries.to_sql( + targets["heat_link_timeseries"]["table"], + schema=targets["heat_link_timeseries"]["schema"], + if_exists="append", + con=db.engine(), + index=False, + ) + def assign_voltage_level(heat_pumps, carrier="heat_pump"): """Assign voltage level to heat pumps @@ -393,7 +460,7 @@ def assign_electrical_bus(heat_pumps, carrier, multiple_per_mv_grid=False): heat_pumps.set_index("area_id", inplace=True) # If district heating areas are supplied by multiple hvmv-substations, - # create one heatpup per electrical bus. + # create one heatpump per electrical bus. # The installed capacity is assigned regarding the share of heat demand. if multiple_per_mv_grid: @@ -405,6 +472,12 @@ def assign_electrical_bus(heat_pumps, carrier, multiple_per_mv_grid=False): power_to_heat.area_id ].values + if "heat_pump" in carrier: + + power_to_heat.loc[:, "cop"] = heat_pumps.cop[ + power_to_heat.area_id + ].values + power_to_heat["share_demand"] = power_to_heat.groupby( "area_id" ).demand.apply(lambda grp: grp / grp.sum()) @@ -463,6 +536,10 @@ def assign_electrical_bus(heat_pumps, carrier, multiple_per_mv_grid=False): power_to_heat.loc[:, "voltage_level"] = heat_pumps.voltage_level + if "heat_pump" in carrier: + + power_to_heat.loc[:, "cop"] = heat_pumps.cop + power_to_heat["capacity"] = heat_pumps.capacity[ power_to_heat.index ].values diff --git a/src/egon/data/datasets/mv_grid_districts.py b/src/egon/data/datasets/mv_grid_districts.py index a7f413eca..72099ed1a 100644 --- a/src/egon/data/datasets/mv_grid_districts.py +++ b/src/egon/data/datasets/mv_grid_districts.py @@ -62,15 +62,25 @@ from functools import partial +from geoalchemy2.types import Geometry +from sqlalchemy import ( + ARRAY, + Boolean, + Column, + Float, + Integer, + Numeric, + Sequence, + String, + func, +) +from sqlalchemy.ext.declarative import declarative_base + from egon.data import db from egon.data.datasets import Dataset +from egon.data.datasets.substation import EgonHvmvSubstation +from egon.data.datasets.substation_voronoi import EgonHvmvSubstationVoronoi from egon.data.db import session_scope -from egon.data.processing.substation import (EgonHvmvSubstation, - EgonHvmvSubstationVoronoi) -from geoalchemy2.types import Geometry -from sqlalchemy import (ARRAY, Boolean, Column, Float, Integer, Numeric, - Sequence, String, func) -from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() metadata = Base.metadata @@ -202,8 +212,10 @@ def substations_in_municipalities(): .group_by(Vg250GemClean.id) ) - muns_with_subst = HvmvSubstPerMunicipality.__table__.insert().from_select( - HvmvSubstPerMunicipality.__table__.columns, q + muns_with_subst = ( + HvmvSubstPerMunicipality.__table__.insert().from_select( + HvmvSubstPerMunicipality.__table__.columns, q + ) ) session.execute(muns_with_subst) session.commit() @@ -212,15 +224,17 @@ def substations_in_municipalities(): already_inserted_muns = session.query( HvmvSubstPerMunicipality.id ).subquery() - muns_without_subst = HvmvSubstPerMunicipality.__table__.insert().from_select( - [ - _ - for _ in HvmvSubstPerMunicipality.__table__.columns - if _.name != "subst_count" - ], - session.query(Vg250GemClean).filter( - Vg250GemClean.id.notin_(already_inserted_muns) - ), + muns_without_subst = ( + HvmvSubstPerMunicipality.__table__.insert().from_select( + [ + _ + for _ in HvmvSubstPerMunicipality.__table__.columns + if _.name != "subst_count" + ], + session.query(Vg250GemClean).filter( + Vg250GemClean.id.notin_(already_inserted_muns) + ), + ) ) session.execute(muns_without_subst) session.commit() @@ -345,13 +359,15 @@ def split_multi_substation_municipalities(): .subquery() ) - originally_1subst = VoronoiMunicipalityCutsAssigned.__table__.insert().from_select( - [ - _ - for _ in VoronoiMunicipalityCutsAssigned.__table__.columns - if _.name != "temp_id" - ], - cut_1subst, + originally_1subst = ( + VoronoiMunicipalityCutsAssigned.__table__.insert().from_select( + [ + _ + for _ in VoronoiMunicipalityCutsAssigned.__table__.columns + if _.name != "temp_id" + ], + cut_1subst, + ) ) session.execute(originally_1subst) session.commit() @@ -515,13 +531,15 @@ def assign_substation_municipality_fragments( .subquery() ) - cut_0subst_insert = VoronoiMunicipalityCutsAssigned.__table__.insert().from_select( - [ - c - for c in cut_0subst_nearest_neighbor.columns - if c.name not in ["temp_id"] - ], - cut_0subst_nearest_neighbor, + cut_0subst_insert = ( + VoronoiMunicipalityCutsAssigned.__table__.insert().from_select( + [ + c + for c in cut_0subst_nearest_neighbor.columns + if c.name not in ["temp_id"] + ], + cut_0subst_nearest_neighbor, + ) ) session.execute(cut_0subst_insert) session.commit() @@ -562,13 +580,15 @@ def merge_polygons_to_grid_district(): ), ).group_by(VoronoiMunicipalityCutsAssigned.bus_id) - joined_municipality_parts_insert = MvGridDistrictsDissolved.__table__.insert().from_select( - [ - c - for c in MvGridDistrictsDissolved.__table__.columns - if c.name != "id" - ], - joined_municipality_parts.subquery(), + joined_municipality_parts_insert = ( + MvGridDistrictsDissolved.__table__.insert().from_select( + [ + c + for c in MvGridDistrictsDissolved.__table__.columns + if c.name != "id" + ], + joined_municipality_parts.subquery(), + ) ) session.execute(joined_municipality_parts_insert) session.commit() @@ -591,13 +611,15 @@ def merge_polygons_to_grid_district(): ) ) - one_substation_insert = MvGridDistrictsDissolved.__table__.insert().from_select( - [ - c - for c in MvGridDistrictsDissolved.__table__.columns - if c.name != "id" - ], - one_substation.subquery(), + one_substation_insert = ( + MvGridDistrictsDissolved.__table__.insert().from_select( + [ + c + for c in MvGridDistrictsDissolved.__table__.columns + if c.name != "id" + ], + one_substation.subquery(), + ) ) session.execute(one_substation_insert) session.commit() @@ -650,9 +672,11 @@ def merge_polygons_to_grid_district(): func.sum(MvGridDistrictsDissolved.area).label("area"), ).group_by(MvGridDistrictsDissolved.bus_id) - joined_mv_grid_district_parts_insert = MvGridDistricts.__table__.insert().from_select( - MvGridDistricts.__table__.columns, - joined_mv_grid_district_parts.subquery(), + joined_mv_grid_district_parts_insert = ( + MvGridDistricts.__table__.insert().from_select( + MvGridDistricts.__table__.columns, + joined_mv_grid_district_parts.subquery(), + ) ) session.execute(joined_mv_grid_district_parts_insert) session.commit() @@ -745,13 +769,15 @@ def nearest_polygon_with_substation( ).distinct(all_nearest_neighbors.c.id) # Insert polygons with newly assigned substation - assigned_polygons_insert = MvGridDistrictsDissolved.__table__.insert().from_select( - [ - c - for c in MvGridDistrictsDissolved.__table__.columns - if c.name not in ["id"] - ], - nearest_neighbors, + assigned_polygons_insert = ( + MvGridDistrictsDissolved.__table__.insert().from_select( + [ + c + for c in MvGridDistrictsDissolved.__table__.columns + if c.name not in ["id"] + ], + nearest_neighbors, + ) ) session.execute(assigned_polygons_insert) session.commit() diff --git a/src/egon/data/datasets/power_plants/__init__.py b/src/egon/data/datasets/power_plants/__init__.py index 8b90b0e83..d923bddb1 100755 --- a/src/egon/data/datasets/power_plants/__init__.py +++ b/src/egon/data/datasets/power_plants/__init__.py @@ -66,8 +66,8 @@ def __init__(self, dependencies): pv_ground_mounted.insert, pv_rooftop_per_mv_grid, }, - assign_weather_data.weather_id, wind_offshore.insert, + assign_weather_data.weatherId_and_busId, ), ) @@ -745,4 +745,4 @@ def allocate_conventional_non_chp_power_plants(): geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})", ) session.add(entry) - session.commit() + session.commit() \ No newline at end of file diff --git a/src/egon/data/datasets/power_plants/assign_weather_data.py b/src/egon/data/datasets/power_plants/assign_weather_data.py index 49d8c1cdd..3d2d8e51a 100644 --- a/src/egon/data/datasets/power_plants/assign_weather_data.py +++ b/src/egon/data/datasets/power_plants/assign_weather_data.py @@ -1,13 +1,34 @@ -import pandas as pd import geopandas as gpd -import numpy as np +import pandas as pd + from egon.data import db -from shapely.geometry import Point -from pathlib import Path +from egon.data.datasets import Dataset import egon.data.config +import egon.data.datasets.power_plants.__init__ as init_pp + + +def weatherId_and_busId(): + power_plants, cfg, con = find_weather_id() + power_plants = find_bus_id(power_plants, cfg) + write_power_plants_table(power_plants, cfg, con) + +def find_bus_id(power_plants, cfg): + # Define bus_id for power plants without it + power_plants_no_busId = power_plants[power_plants.bus_id.isna()] + power_plants = power_plants[~power_plants.bus_id.isna()] -def weather_id(): + power_plants_no_busId = power_plants_no_busId.drop(columns="bus_id") + + if len(power_plants_no_busId) > 0: + power_plants_no_busId = init_pp.assign_bus_id(power_plants_no_busId, cfg) + + power_plants = power_plants.append(power_plants_no_busId) + + return power_plants + + +def find_weather_id(): """ Assign weather data to the weather dependant generators (wind and solar) @@ -19,35 +40,52 @@ def weather_id(): # Connect to the data base con = db.engine() - cfg = egon.data.config.datasets()["power_plants"] + cfg = egon.data.config.datasets()["weather_BusID"] # Import table with power plants - sql = "SELECT * FROM supply.egon_power_plants" + sql = f""" + SELECT * FROM + {cfg['sources']['power_plants']['schema']}. + {cfg['sources']['power_plants']['table']} + """ power_plants = gpd.GeoDataFrame.from_postgis( sql, con, crs="EPSG:4326", geom_col="geom" ) - # select the power_plants that are weather dependant + # select the power_plants that are weather dependant (wind offshore is + # not included here, because it alredy has weather_id assigned) power_plants = power_plants[ (power_plants["carrier"] == "solar") | (power_plants["carrier"] == "wind_onshore") - | (power_plants["carrier"] == "wind_offshore") ] power_plants.set_index("id", inplace=True) # Import table with weather data for each technology - sql = "SELECT * FROM supply.egon_era5_renewable_feedin" + sql = f""" + SELECT * FROM + {cfg['sources']['renewable_feedin']['schema']}. + {cfg['sources']['renewable_feedin']['table']} + """ weather_data = pd.read_sql_query(sql, con) weather_data.set_index("w_id", inplace=True) # Import weather cells with Id to match with the weather data - sql = "SELECT * FROM supply.egon_era5_weather_cells" + sql = f""" + SELECT * FROM + {cfg['sources']['weather_cells']['schema']}. + {cfg['sources']['weather_cells']['table']} + """ weather_cells = gpd.GeoDataFrame.from_postgis( sql, con, crs="EPSG:4326", geom_col="geom" ) # import Germany borders to speed up the matching process sql = "SELECT * FROM boundaries.vg250_sta" + sql = f""" + SELECT * FROM + {cfg['sources']['boundaries']['schema']}. + {cfg['sources']['boundaries']['table']} + """ boundaries = gpd.GeoDataFrame.from_postgis( sql, con, crs="EPSG:4326", geom_col="geometry" ) @@ -62,21 +100,36 @@ def weather_id(): power_plant_list = df.index.to_list() power_plants.loc[power_plant_list, "weather_cell_id"] = weather_id + return (power_plants, cfg, con) + + +def write_power_plants_table(power_plants, cfg, con): + # delete weather dependent power_plants from supply.egon_power_plants db.execute_sql( f""" - DELETE FROM {cfg['target']['schema']}.{cfg['target']['table']} - WHERE carrier IN ('wind_onshore', 'solar', 'wind_offshore') + DELETE FROM {cfg['sources']['power_plants']['schema']}. + {cfg['sources']['power_plants']['table']} + WHERE carrier IN ('wind_onshore', 'solar') """ ) - # Look for the maximum id in the table egon_power_plants - sql = ( - "SELECT MAX(id) FROM " - + cfg["target"]["schema"] - + "." - + cfg["target"]["table"] + # assert that the column "bus_id" is set as integer + power_plants["bus_id"] = power_plants["bus_id"].apply( + lambda x: pd.NA if pd.isna(x) else int(x) ) + + # assert that the column "weather_cell_id" is set as integer + power_plants["weather_cell_id"] = power_plants["weather_cell_id"].apply( + lambda x: pd.NA if pd.isna(x) else int(x) + ) + + # Look for the maximum id in the table egon_power_plants + sql = f""" + SELECT MAX(id) FROM + {cfg['sources']['power_plants']['schema']}. + {cfg['sources']['power_plants']['table']} + """ max_id = pd.read_sql(sql, con) max_id = max_id["max"].iat[0] if max_id == None: @@ -92,10 +145,11 @@ def weather_id(): # Insert into database power_plants.reset_index().to_postgis( - cfg["target"]["table"], - schema=cfg["target"]["schema"], - con=db.engine(), + name=f"{cfg['sources']['power_plants']['table']}", + schema=f"{cfg['sources']['power_plants']['schema']}", + con=con, if_exists="append", ) - return 0 + return "Bus_id and Weather_id were updated succesfully" + diff --git a/src/egon/data/datasets/power_plants/pv_ground_mounted.py b/src/egon/data/datasets/power_plants/pv_ground_mounted.py index 4b7736d94..80634c9cb 100644 --- a/src/egon/data/datasets/power_plants/pv_ground_mounted.py +++ b/src/egon/data/datasets/power_plants/pv_ground_mounted.py @@ -368,7 +368,7 @@ def build_pv(pv_pot, pow_per_area): # build pv farms in selected areas # calculation of centroids - pv_pot["centroid"] = pv_pot["geom"].centroid + pv_pot["centroid"] = pv_pot["geom"].representative_point() # calculation of power in kW pv_pot["installed capacity in kW"] = pd.Series() @@ -488,7 +488,7 @@ def build_additional_pv(potentials, pv, pow_per_area, con): pv_per_distr = gpd.GeoDataFrame() pv_per_distr["geom"] = distr["geom"].copy() centroids = potentials.copy() - centroids["geom"] = centroids["geom"].centroid + centroids["geom"] = centroids["geom"].representative_point() overlay = gpd.sjoin(centroids, distr) @@ -528,7 +528,7 @@ def build_additional_pv(potentials, pv, pow_per_area, con): ) # calculate centroid - pv_per_distr["centroid"] = pv_per_distr["geom"].centroid + pv_per_distr["centroid"] = pv_per_distr["geom"].representative_point() return pv_per_distr diff --git a/src/egon/data/datasets/power_plants/wind_offshore.py b/src/egon/data/datasets/power_plants/wind_offshore.py index d399bcd69..0f62beff0 100644 --- a/src/egon/data/datasets/power_plants/wind_offshore.py +++ b/src/egon/data/datasets/power_plants/wind_offshore.py @@ -60,6 +60,30 @@ def insert(): "Suchraum Gemeinde Papendorf": "32063539", "Suchraum Gemeinden Brünzow/Kemnitz": "460134233", } + w_id = { + "Büttel": "16331", + "Heide/West": "16516", + "Suchraum Gemeinden Ibbenbüren/Mettingen/Westerkappeln": "16326", + "Suchraum Zensenbusch": "16139", + "Rommerskirchen": "16139", + "Oberzier": "16139", + "Garrel/Ost": "16139", + "Diele": "16138", + "Dörpen/West": "15952", + "Emden/Borßum": "15762", + "Emden/Ost": "16140", + "Hagermarsch": "15951", + "Hanekenfähr": "16139", + "Inhausen": "15769", + "Unterweser": "16517", + "Wehrendorf": "16139", + "Wilhelmshaven 2": "16517", + "Rastede": "16139", + "Bentwisch": "16734", + "Lubmin": "16548", + "Suchraum Gemeinde Papendorf": "16352", + "Suchraum Gemeinden Brünzow/Kemnitz": "16548", + } # Match wind offshore table with the corresponding OSM_id offshore["osm_id"] = offshore["Netzverknuepfungspunkt"].map(id_bus) @@ -101,7 +125,10 @@ def insert(): offshore.at[index, "geom"] = busses.at[bus_ind, "point"] else: print(f'Wind offshore farm not found: {wind_park["osm_id"]}') - + + + offshore["weather_cell_id"] = offshore['Netzverknuepfungspunkt'].map(w_id) + offshore['weather_cell_id'] = offshore['weather_cell_id'].apply(int) # Drop offshore wind farms without found connexion point offshore.dropna(subset=["bus_id"], inplace=True) @@ -178,3 +205,4 @@ def insert(): ) return 0 + diff --git a/src/egon/data/datasets/renewable_feedin.py b/src/egon/data/datasets/renewable_feedin.py index f44f3d342..407501db2 100644 --- a/src/egon/data/datasets/renewable_feedin.py +++ b/src/egon/data/datasets/renewable_feedin.py @@ -2,24 +2,24 @@ Central module containing all code dealing with processing era5 weather data. """ -import numpy as np - -import egon.data.config import geopandas as gpd +import numpy as np import pandas as pd + from egon.data import db from egon.data.datasets import Dataset from egon.data.datasets.era5 import import_cutout from egon.data.datasets.scenario_parameters import get_sector_parameters +import egon.data.config class RenewableFeedin(Dataset): def __init__(self, dependencies): super().__init__( name="RenewableFeedin", - version="0.0.1", + version="0.0.4", dependencies=dependencies, - tasks=(wind, pv, solar_thermal), + tasks={wind, pv, solar_thermal, heat_pump_cop}, ) @@ -100,8 +100,9 @@ def federal_states_per_weather_cell(): .drop_duplicates(subset="w_id", keep="first") .set_index("w_id") ) - - weather_cells = weather_cells.dropna(axis=0, subset=["federal_state"]) + + + weather_cells = weather_cells.dropna(axis=0, subset=["federal_state"]) return weather_cells.to_crs(4326) @@ -360,12 +361,80 @@ def solar_thermal(): orientation={"slope": 45.0, "azimuth": 180.0}, per_unit=True, shapes=weather_cells.to_crs(4326).geom, + capacity_factor=False, ) # Create dataframe and insert to database insert_feedin(ts_solar_thermal, "solar_thermal", weather_year) +def heat_pump_cop(): + """ + Calculate coefficient of performance for heat pumps according to + T. Brown et al: "Synergies of sector coupling and transmission + reinforcement in a cost-optimised, highlyrenewable European energy system", + 2018, p. 8 + + Returns + ------- + None. + + """ + # Assume temperature of heating system to 55°C according to Brown et. al + t_sink = 55 + + carrier = "heat_pump_cop" + + # Load configuration + cfg = egon.data.config.datasets()["renewable_feedin"] + + # Get weather cells in Germany + weather_cells = weather_cells_in_germany() + + # Select weather data for Germany + cutout = import_cutout(boundary="Germany") + + # Select weather year from cutout + weather_year = cutout.name.split("-")[1] + + # Calculate feedin timeseries + temperature = cutout.temperature( + shapes=weather_cells.to_crs(4326).geom + ).transpose() + + t_source = temperature.to_pandas() + + delta_t = t_sink - t_source + + # Calculate coefficient of performance for air sourced heat pumps + # according to Brown et. al + cop = 6.81 - 0.121 * delta_t + 0.00063 * delta_t ** 2 + + df = pd.DataFrame( + index=temperature.to_pandas().index, + columns=["weather_year", "carrier", "feedin"], + data={"weather_year": weather_year, "carrier": carrier}, + ) + + df.feedin = cop.values.tolist() + + # Delete existing rows for carrier + db.execute_sql( + f""" + DELETE FROM {cfg['targets']['feedin_table']['schema']}. + {cfg['targets']['feedin_table']['table']} + WHERE carrier = '{carrier}'""" + ) + + # Insert values into database + df.to_sql( + cfg["targets"]["feedin_table"]["table"], + schema=cfg["targets"]["feedin_table"]["schema"], + con=db.engine(), + if_exists="append", + ) + + def insert_feedin(data, carrier, weather_year): """Insert feedin data into database @@ -384,20 +453,24 @@ def insert_feedin(data, carrier, weather_year): """ # Transpose DataFrame - data = data.transpose() + data = data.transpose().to_pandas() # Load configuration cfg = egon.data.config.datasets()["renewable_feedin"] # Initialize DataFrame df = pd.DataFrame( - index=data.to_pandas().index, + index=data.index, columns=["weather_year", "carrier", "feedin"], data={"weather_year": weather_year, "carrier": carrier}, ) + # Convert solar thermal data from W/m^2 to MW/(1000m^2) = kW/m^2 + if carrier == "solar_thermal": + data *= 1e-3 + # Insert feedin into DataFrame - df.feedin = data.to_pandas().values.tolist() + df.feedin = data.values.tolist() # Delete existing rows for carrier db.execute_sql( diff --git a/src/egon/data/datasets/scenario_capacities.py b/src/egon/data/datasets/scenario_capacities.py index 2e602f02f..243e85846 100755 --- a/src/egon/data/datasets/scenario_capacities.py +++ b/src/egon/data/datasets/scenario_capacities.py @@ -60,7 +60,7 @@ class ScenarioCapacities(Dataset): def __init__(self, dependencies): super().__init__( name="ScenarioCapacities", - version="0.0.4", + version="0.0.5", dependencies=dependencies, tasks=(create_table, insert_data_nep), ) diff --git a/src/egon/data/datasets/storages/__init__.py b/src/egon/data/datasets/storages/__init__.py new file mode 100644 index 000000000..cf5b6ff36 --- /dev/null +++ b/src/egon/data/datasets/storages/__init__.py @@ -0,0 +1,225 @@ +"""The central module containing all code dealing with power plant data. +""" +from geoalchemy2 import Geometry +from sqlalchemy import BigInteger, Column, Float, Integer, Sequence, String +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from egon.data.datasets.storages.pumped_hydro import ( + select_mastr_pumped_hydro, + select_nep_pumped_hydro, + match_storage_units, + get_location, + apply_voltage_level_thresholds, +) +from egon.data.datasets.power_plants import assign_voltage_level +import geopandas as gpd +import pandas as pd + +from egon.data import db, config +from egon.data.datasets import Dataset + + +Base = declarative_base() + + +class EgonStorages(Base): + __tablename__ = "egon_storages" + __table_args__ = {"schema": "supply"} + id = Column(BigInteger, Sequence("storage_seq"), primary_key=True) + sources = Column(JSONB) + source_id = Column(JSONB) + carrier = Column(String) + el_capacity = Column(Float) + bus_id = Column(Integer) + voltage_level = Column(Integer) + scenario = Column(String) + geom = Column(Geometry("POINT", 4326)) + + +class PumpedHydro(Dataset): + def __init__(self, dependencies): + super().__init__( + name="Storages", + version="0.0.0", + dependencies=dependencies, + tasks=(create_tables, allocate_pumped_hydro), + ) + + +def create_tables(): + """Create tables for power plant data + Returns + ------- + None. + """ + + cfg = config.datasets()["storages"] + db.execute_sql(f"CREATE SCHEMA IF NOT EXISTS {cfg['target']['schema']};") + engine = db.engine() + db.execute_sql( + f"""DROP TABLE IF EXISTS + {cfg['target']['schema']}.{cfg['target']['table']}""" + ) + + db.execute_sql("""DROP SEQUENCE IF EXISTS pp_seq""") + EgonStorages.__table__.create(bind=engine, checkfirst=True) + + +def allocate_pumped_hydro(): + + carrier = "pumped_hydro" + + cfg = config.datasets()["power_plants"] + + nep = select_nep_pumped_hydro() + mastr = select_mastr_pumped_hydro() + + # Assign voltage level to MaStR + mastr["voltage_level"] = assign_voltage_level( + mastr.rename({"el_capacity": "Nettonennleistung"}, axis=1), cfg + ) + + # Initalize DataFrame for matching power plants + matched = gpd.GeoDataFrame( + columns=[ + "carrier", + "el_capacity", + "scenario", + "geometry", + "MaStRNummer", + "source", + "voltage_level", + ] + ) + + # Match pumped_hydro units from NEP list + # using PLZ and capacity + matched, mastr, nep = match_storage_units( + nep, mastr, matched, buffer_capacity=0.1, consider_carrier=False + ) + + # Match plants from NEP list using plz, + # neglecting the capacity + matched, mastr, nep = match_storage_units( + nep, + mastr, + matched, + consider_location="plz", + consider_carrier=False, + consider_capacity=False, + ) + + # Match plants from NEP list using city, + # neglecting the capacity + matched, mastr, nep = match_storage_units( + nep, + mastr, + matched, + consider_location="city", + consider_carrier=False, + consider_capacity=False, + ) + + # Match remaining plants from NEP using the federal state + matched, mastr, nep = match_storage_units( + nep, + mastr, + matched, + buffer_capacity=0.1, + consider_location="federal_state", + consider_carrier=False, + ) + + # Match remaining plants from NEP using the federal state + matched, mastr, nep = match_storage_units( + nep, + mastr, + matched, + buffer_capacity=0.7, + consider_location="federal_state", + consider_carrier=False, + ) + + print(f"{matched.el_capacity.sum()} MW of {carrier} matched") + print(f"{nep.c2035_capacity.sum()} MW of {carrier} not matched") + + if nep.c2035_capacity.sum() > 0: + + # Get location using geolocator and city information + located, unmatched = get_location(nep) + + # Bring both dataframes together + matched = matched.append( + located[ + [ + "carrier", + "el_capacity", + "scenario", + "geometry", + "source", + "MaStRNummer", + ] + ], + ignore_index=True, + ) + + # Set CRS + matched.crs = "EPSG:4326" + + # Assign voltage level + matched = apply_voltage_level_thresholds(matched) + + # Assign bus_id + # Load grid district polygons + mv_grid_districts = db.select_geodataframe( + f""" + SELECT * FROM {cfg['sources']['egon_mv_grid_district']} + """, + epsg=4326, + ) + + ehv_grid_districts = db.select_geodataframe( + f""" + SELECT * FROM {cfg['sources']['ehv_voronoi']} + """, + epsg=4326, + ) + + # Perform spatial joins for plants in ehv and hv level seperately + power_plants_hv = gpd.sjoin( + matched[matched.voltage_level >= 3], + mv_grid_districts[["bus_id", "geom"]], + how="left", + ).drop(columns=["index_right"]) + power_plants_ehv = gpd.sjoin( + matched[matched.voltage_level < 3], + ehv_grid_districts[["bus_id", "geom"]], + how="left", + ).drop(columns=["index_right"]) + + # Combine both dataframes + power_plants = pd.concat([power_plants_hv, power_plants_ehv]) + + # Delete existing units in the target table + db.execute_sql( + f""" DELETE FROM {cfg ['target']['schema']}.{cfg ['target']['table']} + WHERE carrier IN ('pumped_hydro') + AND scenario='eGon2035';""" + ) + + # Insert into target table + session = sessionmaker(bind=db.engine())() + for i, row in power_plants.iterrows(): + entry = EgonStorages( + sources={"el_capacity": row.source}, + source_id={"MastrNummer": row.MaStRNummer}, + carrier=row.carrier, + el_capacity=row.el_capacity, + voltage_level=row.voltage_level, + bus_id=row.bus_id, + scenario=row.scenario, + geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})", + ) + session.add(entry) + session.commit() diff --git a/src/egon/data/datasets/storages/pumped_hydro.py b/src/egon/data/datasets/storages/pumped_hydro.py new file mode 100644 index 000000000..3add20fb6 --- /dev/null +++ b/src/egon/data/datasets/storages/pumped_hydro.py @@ -0,0 +1,339 @@ +""" +The module containing code allocating pumped hydro plants based on +data from MaStR and NEP. +""" + +import pandas as pd +import geopandas as gpd +import egon.data.config +from egon.data import db, config +from egon.data.datasets.power_plants import ( + assign_voltage_level, + assign_bus_id, + assign_gas_bus_id, + filter_mastr_geometry, + select_target, +) +from egon.data.datasets.chp.match_nep import match_nep_chp +from egon.data.datasets.chp.small_chp import assign_use_case +from geopy.geocoders import Nominatim +from sqlalchemy.orm import sessionmaker + + +def select_nep_pumped_hydro(): + """ Select pumped hydro plants from NEP power plants list + + + Returns + ------- + pandas.DataFrame + Pumped hydro plants from NEP list + """ + cfg = egon.data.config.datasets()["power_plants"] + + carrier = "pumped_hydro" + + # Select plants with geolocation from list of conventional power plants + nep_ph = db.select_dataframe( + f""" + SELECT bnetza_id, name, carrier, postcode, capacity, city, + federal_state, c2035_capacity + FROM {cfg['sources']['nep_conv']} + WHERE carrier = '{carrier}' + AND c2035_capacity > 0 + AND postcode != 'None'; + """ + ) + + # Removing plants out of Germany + nep_ph["postcode"] = nep_ph["postcode"].astype(str) + nep_ph = nep_ph[~nep_ph["postcode"].str.contains("A")] + nep_ph = nep_ph[~nep_ph["postcode"].str.contains("L")] + nep_ph = nep_ph[~nep_ph["postcode"].str.contains("nan")] + + # Remove the subunits from the bnetza_id + nep_ph["bnetza_id"] = nep_ph["bnetza_id"].str[0:7] + + return nep_ph + + +def select_mastr_pumped_hydro(): + """ Select pumped hydro plants from MaStR + + + Returns + ------- + pandas.DataFrame + Pumped hydro plants from MaStR + """ + sources = egon.data.config.datasets()["power_plants"]["sources"] + + # Read-in data from MaStR + mastr_ph = pd.read_csv( + sources["mastr_storage"], + delimiter=",", + usecols=[ + "Nettonennleistung", + "EinheitMastrNummer", + "Kraftwerksnummer", + "Technologie", + "Postleitzahl", + "Laengengrad", + "Breitengrad", + "EinheitBetriebsstatus", + "LokationMastrNummer", + "Ort", + "Bundesland", + ], + ) + + # Rename columns + mastr_ph = mastr_ph.rename( + columns={ + "Kraftwerksnummer": "bnetza_id", + "Technologie": "carrier", + "Postleitzahl": "plz", + "Ort": "city", + "Bundesland": "federal_state", + "Nettonennleistung": "el_capacity", + } + ) + + # Select only pumped hydro units + mastr_ph = mastr_ph[mastr_ph.carrier == "Pumpspeicher"] + + # Select only pumped hydro units which are in operation + mastr_ph = mastr_ph[mastr_ph.EinheitBetriebsstatus == "InBetrieb"] + + # Insert geometry column + mastr_ph = mastr_ph[~(mastr_ph["Laengengrad"].isnull())] + mastr_ph = gpd.GeoDataFrame( + mastr_ph, + geometry=gpd.points_from_xy( + mastr_ph["Laengengrad"], mastr_ph["Breitengrad"] + ), + ) + + # Drop rows without post code and update datatype of postcode + mastr_ph = mastr_ph[~mastr_ph["plz"].isnull()] + mastr_ph["plz"] = mastr_ph["plz"].astype(int) + + # Calculate power in MW + mastr_ph.loc[:, "el_capacity"] *= 1e-3 + + mastr_ph = mastr_ph.set_crs(4326) + + mastr_ph = mastr_ph[~(mastr_ph["federal_state"].isnull())] + + # Drop CHP outside of Germany/ outside the test mode area + mastr_ph = filter_mastr_geometry(mastr_ph, federal_state=None) + + return mastr_ph + + +def match_storage_units( + nep, + mastr, + matched, + buffer_capacity=0.1, + consider_location="plz", + consider_carrier=True, + consider_capacity=True, +): + """ Match storage_units (in this case only pumped hydro) from MaStR + to list of power plants from NEP + + Parameters + ---------- + nep : pandas.DataFrame + storage units from NEP which are not matched to MaStR + mastr : pandas.DataFrame + Pstorage_units from MaStR which are not matched to NEP + matched : pandas.DataFrame + Already matched storage_units + buffer_capacity : float, optional + Maximum difference in capacity in p.u. The default is 0.1. + + Returns + ------- + matched : pandas.DataFrame + Matched CHP + mastr : pandas.DataFrame + storage_units from MaStR which are not matched to NEP + nep : pandas.DataFrame + storage_units from NEP which are not matched to MaStR + + """ + + list_federal_states = pd.Series( + { + "Hamburg": "HH", + "Sachsen": "SN", + "MecklenburgVorpommern": "MV", + "Thueringen": "TH", + "SchleswigHolstein": "SH", + "Bremen": "HB", + "Saarland": "SL", + "Bayern": "BY", + "BadenWuerttemberg": "BW", + "Brandenburg": "BB", + "Hessen": "HE", + "NordrheinWestfalen": "NW", + "Berlin": "BE", + "Niedersachsen": "NI", + "SachsenAnhalt": "ST", + "RheinlandPfalz": "RP", + } + ) + + carrier = "pumped_hydro" + + for index, row in nep[ + (nep["carrier"] == carrier) & (nep["postcode"] != "None") + ].iterrows(): + + # Select plants from MaStR that match carrier, PLZ + # and have a similar capacity + # Create a copy of all power plants from MaStR + selected = mastr.copy() + + # Set capacity constraint using buffer + if consider_capacity: + selected = selected[ + ( + selected.el_capacity + <= row["capacity"] * (1 + buffer_capacity) + ) + & ( + selected.el_capacity + >= row["capacity"] * (1 - buffer_capacity) + ) + ] + + # Set geographic constraint, either choose power plants + # with the same postcode, city or federal state + if consider_location == "plz": + selected = selected[ + selected.plz.astype(int).astype(str) == row["postcode"] + ] + elif consider_location == "city": + selected = selected[selected.city == row.city.replace("\n", " ")] + elif consider_location == "federal_state": + selected.loc[:, "federal_state"] = list_federal_states[ + selected.federal_state + ].values + selected = selected[selected.federal_state == row.federal_state] + + # Set capacity constraint if selected + if consider_carrier: + selected = selected[selected.carrier == carrier] + + # If a plant could be matched, add this to matched + if len(selected) > 0: + matched = matched.append( + gpd.GeoDataFrame( + data={ + "source": "MaStR scaled with NEP 2021 list", + "MaStRNummer": selected.EinheitMastrNummer.head(1), + "carrier": carrier, + "el_capacity": row.c2035_capacity, + "scenario": "eGon2035", + "geometry": selected.geometry.head(1), + "voltage_level": selected.voltage_level.head(1), + } + ) + ) + + # Drop matched storage units from nep + nep = nep.drop(index) + + # Drop matched storage units from MaStR list if the location is accurate + if consider_capacity & consider_carrier: + mastr = mastr.drop(selected.index) + + return matched, mastr, nep + + +def get_location(unmatched): + """ Gets a geolocation for units which couldn't be matched using MaStR data. + Uses geolocator and the city name from NEP data to create longitude and + latitude for a list of unmatched units. + + Parameters + ---------- + unmatched : pandas.DataFrame + storage units from NEP which are not matched to MaStR but containing + a city information + + Returns + ------- + unmatched: pandas.DataFrame + Units for which no geolocation could be identified + + located : pandas.DataFrame + Units with a geolocation based on their city information + + """ + + geolocator = Nominatim(user_agent="egon_data") + + # Create array of cities + cities = unmatched.city.values + + # identify longitude and latitude for all cities in the array + for city in cities: + lon = geolocator.geocode(city).longitude + lat = geolocator.geocode(city).latitude + + # write information on lon and lat to df + unmatched.loc[unmatched.city == city, "lon"] = lon + unmatched.loc[unmatched.city == city, "lat"] = lat + + # Get a point geometry from lon and lat information + unmatched["geometry"] = gpd.points_from_xy(unmatched.lon, unmatched.lat) + unmatched.crs = "EPSG:4326" + + # Copy units with lon and lat to a new dataframe + located = unmatched[ + ["bnetza_id", "name", "carrier", "city", "c2035_capacity", "geometry"] + ].copy() + located.dropna(subset=["geometry"], inplace=True) + + # Rename columns for compatibility reasons + located = located.rename( + columns={"c2035_capacity": "el_capacity", "bnetza_id": "MaStRNummer"} + ) + located["scenario"] = "eGon2035" + located["source"] = "NEP power plants geolocated using city" + + unmatched = unmatched.drop(located.index.values) + + return located, unmatched + + +def apply_voltage_level_thresholds(power_plants): + """Assigns voltage level to power plants based on thresholds defined for + the egon project. + + Parameters + ---------- + power_plants : pandas.DataFrame + Power plants and their electrical capacity + Returns + ------- + pandas.DataFrame + Power plants including voltage_level + """ + + # Identify voltage_level for every power plant taking thresholds into + # account which were defined in the eGon project. Existing entries on voltage + # will be overwritten + + power_plants.loc[power_plants["el_capacity"] < 0.1, "voltage_level"] = 7 + power_plants.loc[power_plants["el_capacity"] > 0.1, "voltage_level"] = 6 + power_plants.loc[power_plants["el_capacity"] > 0.2, "voltage_level"] = 5 + power_plants.loc[power_plants["el_capacity"] > 5.5, "voltage_level"] = 4 + power_plants.loc[power_plants["el_capacity"] > 20, "voltage_level"] = 3 + power_plants.loc[power_plants["el_capacity"] > 120, "voltage_level"] = 1 + + return power_plants diff --git a/src/egon/data/processing/substation/__init__.py b/src/egon/data/datasets/substation/__init__.py similarity index 51% rename from src/egon/data/processing/substation/__init__.py rename to src/egon/data/datasets/substation/__init__.py index 6f693a6c3..e099ebbb1 100644 --- a/src/egon/data/processing/substation/__init__.py +++ b/src/egon/data/datasets/substation/__init__.py @@ -1,12 +1,15 @@ """The central module containing code to create substation tables """ - -import egon.data.config -from egon.data import db +from airflow.operators.postgres_operator import PostgresOperator +from geoalchemy2.types import Geometry from sqlalchemy import Column, Float, Integer, Sequence, Text from sqlalchemy.ext.declarative import declarative_base -from geoalchemy2.types import Geometry +import importlib_resources as resources + +from egon.data import db +from egon.data.datasets import Dataset +import egon.data.config Base = declarative_base() @@ -67,34 +70,35 @@ class EgonHvmvSubstation(Base): status = Column(Integer) -class EgonHvmvSubstationVoronoi(Base): - __tablename__ = "egon_hvmv_substation_voronoi" - __table_args__ = {"schema": "grid"} - id = Column( - Integer, - Sequence("egon_hvmv_substation_voronoi_id_seq", schema="grid"), - server_default=Sequence( - "egon_hvmv_substation_voronoi_id_seq", schema="grid" - ).next_value(), - primary_key=True, - ) - bus_id = Column(Integer) - geom = Column(Geometry("Multipolygon", 4326)) - - -class EgonEhvSubstationVoronoi(Base): - __tablename__ = "egon_ehv_substation_voronoi" - __table_args__ = {"schema": "grid"} - id = Column( - Integer, - Sequence("egon_ehv_substation_voronoi_id_seq", schema="grid"), - server_default=Sequence( - "egon_ehv_substation_voronoi_id_seq", schema="grid" - ).next_value(), - primary_key=True, - ) - bus_id = Column(Integer) - geom = Column(Geometry("Multipolygon", 4326)) +class SubstationExtraction(Dataset): + def __init__(self, dependencies): + super().__init__( + name="substation_extraction", + version="0.0.0", + dependencies=dependencies, + tasks=( + create_tables, + create_sql_functions, + { + PostgresOperator( + task_id="hvmv_substation", + sql=resources.read_text( + __name__, "hvmv_substation.sql" + ), + postgres_conn_id="egon_data", + autocommit=True, + ), + PostgresOperator( + task_id="ehv_substation", + sql=resources.read_text( + __name__, "ehv_substation.sql" + ), + postgres_conn_id="egon_data", + autocommit=True, + ), + }, + ), + ) def create_tables(): @@ -103,69 +107,42 @@ def create_tables(): ------- None. """ - cfg_ehv = egon.data.config.datasets()["ehv_substation"] - cfg_hvmv = egon.data.config.datasets()["hvmv_substation"] - cfg_ehv_voronoi = egon.data.config.datasets()["ehv_substation_voronoi"] - cfg_hvmv_voronoi = egon.data.config.datasets()["hvmv_substation_voronoi"] - db.execute_sql( - f"CREATE SCHEMA IF NOT EXISTS {cfg_ehv['processed']['schema']};" - ) - - # Drop tables - db.execute_sql( - f"""DROP TABLE IF EXISTS - {cfg_ehv['processed']['schema']}. - {cfg_ehv['processed']['table']} CASCADE;""" - ) + cfg_targets = egon.data.config.datasets()["substation_extraction"][ + "targets" + ] db.execute_sql( - f"""DROP TABLE IF EXISTS - {cfg_hvmv['processed']['schema']}. - {cfg_hvmv['processed']['table']} CASCADE;""" + f"CREATE SCHEMA IF NOT EXISTS {cfg_targets['hvmv_substation']['schema']};" ) + # Drop tables db.execute_sql( f"""DROP TABLE IF EXISTS - {cfg_ehv_voronoi['processed']['schema']}. - {cfg_ehv_voronoi['processed']['table']} CASCADE;""" + {cfg_targets['ehv_substation']['schema']}. + {cfg_targets['ehv_substation']['table']} CASCADE;""" ) db.execute_sql( f"""DROP TABLE IF EXISTS - {cfg_hvmv_voronoi['processed']['schema']}. - {cfg_hvmv_voronoi['processed']['table']} CASCADE;""" - ) - - # Drop sequences - db.execute_sql( - f"""DROP SEQUENCE IF EXISTS - {cfg_ehv_voronoi['processed']['schema']}. - {cfg_ehv_voronoi['processed']['table']}_id_seq CASCADE;""" + {cfg_targets['hvmv_substation']['schema']}. + {cfg_targets['hvmv_substation']['table']} CASCADE;""" ) db.execute_sql( f"""DROP SEQUENCE IF EXISTS - {cfg_hvmv_voronoi['processed']['schema']}. - {cfg_hvmv_voronoi['processed']['table']}_id_seq CASCADE;""" + {cfg_targets['hvmv_substation']['schema']}. + {cfg_targets['hvmv_substation']['table']}_bus_id_seq CASCADE;""" ) db.execute_sql( f"""DROP SEQUENCE IF EXISTS - {cfg_hvmv['processed']['schema']}. - {cfg_hvmv['processed']['table']}_bus_id_seq CASCADE;""" - ) - - db.execute_sql( - f"""DROP SEQUENCE IF EXISTS - {cfg_ehv['processed']['schema']}. - {cfg_ehv['processed']['table']}_bus_id_seq CASCADE;""" + {cfg_targets['ehv_substation']['schema']}. + {cfg_targets['ehv_substation']['table']}_bus_id_seq CASCADE;""" ) engine = db.engine() EgonEhvSubstation.__table__.create(bind=engine, checkfirst=True) EgonHvmvSubstation.__table__.create(bind=engine, checkfirst=True) - EgonEhvSubstationVoronoi.__table__.create(bind=engine, checkfirst=True) - EgonHvmvSubstationVoronoi.__table__.create(bind=engine, checkfirst=True) def create_sql_functions(): @@ -256,77 +233,3 @@ def create_sql_functions(): COST 100; """ ) - - -def create_voronoi(): - """ - Creates voronoi polygons for hvmv and ehv substations - - Returns - ------- - None. - - """ - - substation_list = ["hvmv_substation", "ehv_substation"] - - for substation in substation_list: - schema = egon.data.config.datasets()[substation]["processed"]["schema"] - substation_table = egon.data.config.datasets()[substation][ - "processed" - ]["table"] - voronoi_table = egon.data.config.datasets()[substation + "_voronoi"][ - "processed" - ]["table"] - view = "grid.egon_voronoi_no_borders" - boundary = "boundaries.vg250_sta_union" - - # Create view for Voronoi polygons without taking borders into account - db.execute_sql( - f"DROP VIEW IF EXISTS {schema}.egon_voronoi_no_borders CASCADE;" - ) - - db.execute_sql( - f""" - CREATE VIEW {view} AS - SELECT (ST_Dump(ST_VoronoiPolygons(ST_collect(a.point)))).geom - FROM {schema}.{substation_table} a; - """ - ) - - # Clip Voronoi with boundaries - db.execute_sql( - f""" - INSERT INTO {schema}.{voronoi_table} (geom) - (SELECT ST_Multi(ST_Intersection( - ST_Transform(a.geometry, 4326), b.geom)) AS geom - FROM {boundary} a - CROSS JOIN {view} b); - """ - ) - - # Assign substation id as foreign key - db.execute_sql( - f""" - UPDATE {schema}.{voronoi_table} AS t1 - SET bus_id = t2.bus_id - FROM (SELECT voi.id AS id, - sub.bus_id ::integer AS bus_id - FROM {schema}.{voronoi_table} AS voi, - {schema}.{substation_table} AS sub - WHERE voi.geom && sub.point AND - ST_CONTAINS(voi.geom,sub.point) - GROUP BY voi.id,sub.bus_id - )AS t2 - WHERE t1.id = t2.id; - """ - ) - - db.execute_sql( - f""" - CREATE INDEX {voronoi_table}_idx - ON {schema}.{voronoi_table} USING gist (geom); - """ - ) - - db.execute_sql(f"DROP VIEW IF EXISTS {view} CASCADE;") diff --git a/src/egon/data/processing/substation/ehv_substation.sql b/src/egon/data/datasets/substation/ehv_substation.sql similarity index 100% rename from src/egon/data/processing/substation/ehv_substation.sql rename to src/egon/data/datasets/substation/ehv_substation.sql diff --git a/src/egon/data/processing/substation/hvmv_substation.sql b/src/egon/data/datasets/substation/hvmv_substation.sql similarity index 100% rename from src/egon/data/processing/substation/hvmv_substation.sql rename to src/egon/data/datasets/substation/hvmv_substation.sql diff --git a/src/egon/data/datasets/substation_voronoi.py b/src/egon/data/datasets/substation_voronoi.py new file mode 100644 index 000000000..3676df06b --- /dev/null +++ b/src/egon/data/datasets/substation_voronoi.py @@ -0,0 +1,166 @@ +"""The central module containing code to create substation voronois + +""" + +import egon.data.config +from egon.data import db +from egon.data.datasets import Dataset +from sqlalchemy import Column, Integer, Sequence +from sqlalchemy.ext.declarative import declarative_base +from geoalchemy2.types import Geometry + +Base = declarative_base() + + +class SubstationVoronoi(Dataset): + def __init__(self, dependencies): + super().__init__( + name="substation_voronoi", + version="0.0.0", + dependencies=dependencies, + tasks=( + create_tables, + substation_voronoi, + ), + ) + + +class EgonHvmvSubstationVoronoi(Base): + __tablename__ = "egon_hvmv_substation_voronoi" + __table_args__ = {"schema": "grid"} + id = Column( + Integer, + Sequence("egon_hvmv_substation_voronoi_id_seq", schema="grid"), + server_default=Sequence( + "egon_hvmv_substation_voronoi_id_seq", schema="grid" + ).next_value(), + primary_key=True, + ) + bus_id = Column(Integer) + geom = Column(Geometry("Multipolygon", 4326)) + + +class EgonEhvSubstationVoronoi(Base): + __tablename__ = "egon_ehv_substation_voronoi" + __table_args__ = {"schema": "grid"} + id = Column( + Integer, + Sequence("egon_ehv_substation_voronoi_id_seq", schema="grid"), + server_default=Sequence( + "egon_ehv_substation_voronoi_id_seq", schema="grid" + ).next_value(), + primary_key=True, + ) + bus_id = Column(Integer) + geom = Column(Geometry("Multipolygon", 4326)) + + +def create_tables(): + """Create tables for voronoi polygons + Returns + ------- + None. + """ + + cfg_voronoi = egon.data.config.datasets()["substation_voronoi"]["targets"] + + + db.execute_sql( + f"""DROP TABLE IF EXISTS + {cfg_voronoi['ehv_substation_voronoi']['schema']}. + {cfg_voronoi['ehv_substation_voronoi']['table']} CASCADE;""" + ) + + db.execute_sql( + f"""DROP TABLE IF EXISTS + {cfg_voronoi['hvmv_substation_voronoi']['schema']}. + {cfg_voronoi['hvmv_substation_voronoi']['table']} CASCADE;""" + ) + + # Drop sequences + db.execute_sql( + f"""DROP SEQUENCE IF EXISTS + {cfg_voronoi['ehv_substation_voronoi']['schema']}. + {cfg_voronoi['ehv_substation_voronoi']['table']}_id_seq CASCADE;""" + ) + + db.execute_sql( + f"""DROP SEQUENCE IF EXISTS + {cfg_voronoi['hvmv_substation_voronoi']['schema']}. + {cfg_voronoi['hvmv_substation_voronoi']['table']}_id_seq CASCADE;""" + ) + + engine = db.engine() + EgonEhvSubstationVoronoi.__table__.create(bind=engine, checkfirst=True) + EgonHvmvSubstationVoronoi.__table__.create(bind=engine, checkfirst=True) + +def substation_voronoi(): + """ + Creates voronoi polygons for hvmv and ehv substations + + Returns + ------- + None. + + """ + + substation_list = ["hvmv_substation", "ehv_substation"] + + for substation in substation_list: + cfg_boundaries = egon.data.config.datasets()["substation_voronoi"]["sources"]["boundaries"] + cfg_substation = egon.data.config.datasets()["substation_voronoi"]["sources"][substation] + cfg_voronoi = egon.data.config.datasets()["substation_voronoi"]["targets"][substation+ "_voronoi"] + + view = "grid.egon_voronoi_no_borders" + + # Create view for Voronoi polygons without taking borders into account + db.execute_sql( + f"DROP VIEW IF EXISTS {view} CASCADE;" + ) + + db.execute_sql( + f""" + CREATE VIEW {view} AS + SELECT (ST_Dump(ST_VoronoiPolygons(ST_collect(a.point)))).geom + FROM {cfg_substation['schema']}. + {cfg_substation['table']} a; + """ + ) + + # Clip Voronoi with boundaries + db.execute_sql( + f""" + INSERT INTO {cfg_voronoi['schema']}.{cfg_voronoi['table']} (geom) + (SELECT ST_Multi(ST_Intersection( + ST_Transform(a.geometry, 4326), b.geom)) AS geom + FROM {cfg_boundaries['schema']}. + {cfg_boundaries['table']} a + CROSS JOIN {view} b); + """ + ) + + # Assign substation id as foreign key + db.execute_sql( + f""" + UPDATE {cfg_voronoi['schema']}.{cfg_voronoi['table']} AS t1 + SET bus_id = t2.bus_id + FROM (SELECT voi.id AS id, + sub.bus_id ::integer AS bus_id + FROM {cfg_voronoi['schema']}.{cfg_voronoi['table']} AS voi, + {cfg_substation['schema']}.{cfg_substation['table']} AS sub + WHERE voi.geom && sub.point AND + ST_CONTAINS(voi.geom,sub.point) + GROUP BY voi.id,sub.bus_id + )AS t2 + WHERE t1.id = t2.id; + """ + ) + + db.execute_sql( + f""" + CREATE INDEX {cfg_voronoi['table']}_idx + ON {cfg_voronoi['schema']}.{cfg_voronoi['table']} USING gist (geom); + """ + ) + + db.execute_sql(f"DROP VIEW IF EXISTS {view} CASCADE;") diff --git a/src/egon/data/metadata.py b/src/egon/data/metadata.py index 86600a792..8ade5c733 100644 --- a/src/egon/data/metadata.py +++ b/src/egon/data/metadata.py @@ -192,7 +192,7 @@ def generate_resource_fields_from_sqla_model(model): Examples -------- >>> from egon.data.metadata import generate_resource_fields_from_sqla_model - ... from egon.data.datasets.zensus_vg250 import Vg250Sta + >>> from egon.data.datasets.zensus_vg250 import Vg250Sta >>> resources = generate_resource_fields_from_sqla_model(Vg250Sta) Parameters @@ -230,9 +230,9 @@ def generate_resource_fields_from_db_table(schema, table, geom_columns=None): Examples -------- >>> from egon.data.metadata import generate_resource_fields_from_db_table - ... resources = generate_resource_fields_from_db_table( + >>> resources = generate_resource_fields_from_db_table( ... 'openstreetmap', 'osm_point', ['geom', 'geom_centroid'] - >>> ) + ... ) # doctest: +SKIP Parameters ---------- diff --git a/src/egon/data/processing/gas_areas.py b/src/egon/data/processing/gas_areas.py index 1b8ae8650..2e3b5319c 100755 --- a/src/egon/data/processing/gas_areas.py +++ b/src/egon/data/processing/gas_areas.py @@ -2,12 +2,22 @@ """ from egon.data import db +from egon.data.datasets import Dataset from geoalchemy2.types import Geometry from sqlalchemy import Column, Float, Integer, Sequence, Text from sqlalchemy.ext.declarative import declarative_base -Base = declarative_base() +class GasAreas(Dataset): + def __init__(self, dependencies): + super().__init__( + name="GasAreas", + version="0.0.0", + dependencies=dependencies, + tasks=(create_voronoi), + ) + +Base = declarative_base() class EgonCH4VoronoiTmp(Base): __tablename__ = "egon_ch4_voronoi_tmp" diff --git a/src/egon/data/processing/power_to_h2.py b/src/egon/data/processing/power_to_h2.py index a1aea72a4..b14716aa4 100755 --- a/src/egon/data/processing/power_to_h2.py +++ b/src/egon/data/processing/power_to_h2.py @@ -7,10 +7,20 @@ import pandas as pd from egon.data import db +from egon.data.datasets import Dataset from geoalchemy2.types import Geometry from scipy.spatial import cKDTree from shapely import geometry +class PowertoH2(Dataset): + def __init__(self, dependencies): + super().__init__( + name="PowertoH2", + version="0.0.0", + dependencies=dependencies, + tasks=(insert_power_to_h2), + ) + def insert_power_to_h2(): """Function defining the potential power-to-H2 capacities and inserting them in the etrago_link table. diff --git a/tests/test_egon-data.py b/tests/test_egon-data.py index c74f26565..2b584fcc4 100644 --- a/tests/test_egon-data.py +++ b/tests/test_egon-data.py @@ -32,9 +32,9 @@ def test_airflow(): assert result.output == "" -def test_pipeline_and_tasks_importability(): +def test_pipeline_importability(): error = None - for m in ["egon.data.airflow.dags.pipeline", "egon.data.airflow.tasks"]: + for m in ["egon.data.airflow.dags.pipeline"]: try: import_module(m) except Exception as e: diff --git a/tox.ini b/tox.ini index 414513e87..87174b04f 100644 --- a/tox.ini +++ b/tox.ini @@ -2,26 +2,23 @@ # Map python versions used in build.yml to tox environments # This mapping is used by tox-gh-actions python = - 3.7: py37, clean, check, docs, report - 3.8: py38, clean, check, docs, report + 3.7: py37, clean, docs, report + 3.8: py38, clean, docs, report [gh-actions:env] PLATFORM = ubuntu-latest: linux - macos-latest: macos - windows-latest: windows [tox] envlist = clean, - check, docs, - py{37,38}-{cover,nocov}-{linux,macos,windows}, + py{37,38}-{cover,nocov}-{linux}, report [testenv] basepython = - {bootstrap,clean,check,report,docs,codecov,coveralls}: {env:TOXPYTHON:python3} + {bootstrap,clean,report,docs,codecov,coveralls}: {env:TOXPYTHON:python3} setenv = PYTHONPATH={toxinidir}/tests PYTHONUNBUFFERED=yes @@ -29,7 +26,6 @@ passenv = * deps = pytest - #pytest-travis-fold commands = {posargs:pytest -vv --ignore=src} @@ -93,7 +89,7 @@ commands = coverage erase skip_install = true deps = coverage -[testenv:py37-cover-{linux,macos,windows}] +[testenv:py37-cover-{linux}] basepython = {env:TOXPYTHON:python3.7} setenv = {[testenv]setenv} @@ -104,10 +100,10 @@ deps = {[testenv]deps} pytest-cov -[testenv:py37-nocov-{linux,macos,windows}] +[testenv:py37-nocov-{linux}] basepython = {env:TOXPYTHON:python3.7} -[testenv:py38-cover-{linux,macos,windows}] +[testenv:py38-cover-{linux}] basepython = {env:TOXPYTHON:python3.8} setenv = {[testenv]setenv} @@ -118,5 +114,5 @@ deps = {[testenv]deps} pytest-cov -[testenv:py38-nocov-{linux,macos,windows}] +[testenv:py38-nocov-{linux}] basepython = {env:TOXPYTHON:python3.8}