Skip to content

Commit

Permalink
Replace some uses of read_sql and read_postgis
Browse files Browse the repository at this point in the history
I tried to replace all instances I found where these functions where
used `session.bind`s outside of the `session`'s context manager. Using
objects outside their context manager is not a good pattern. These
instances worked, because `session.bind` effectively uses the underlying
engine, so it should be the same as `db.engine()`, but you never know.
Also, these uses where unnecessary because the `DataFrame`s could simply
be obtained by using the actual query results. The `GeoDataFrame`s where
a little bit harder because they expect Shapely geometries and
Geoalchemy2 defaults to a different datatype, but thankfully it also
supplies a conversion function.
  • Loading branch information
gnn committed Nov 9, 2022
1 parent 5fbddc8 commit 67111cb
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 149 deletions.
5 changes: 3 additions & 2 deletions src/egon/data/datasets/electricity_demand/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ def get_annual_household_el_demand_cells():
== HouseholdElectricityProfilesInCensusCells.cell_id
)
.order_by(HouseholdElectricityProfilesOfBuildings.id)
.all()
)

df_buildings_and_profiles = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col="id"
df_buildings_and_profiles = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query], index="id"
)

# Read demand profiles from egon-data-bundle
Expand Down
150 changes: 86 additions & 64 deletions src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,17 @@ def amenities_without_buildings():
EgonDemandRegioZensusElectricity.sector == "service",
EgonDemandRegioZensusElectricity.scenario == "eGon2035",
)
.all()
)

df_amenities_without_buildings = gpd.read_postgis(
cells_query.statement,
cells_query.session.bind,
geom_col="geom_amenity",
df_amenities_without_buildings = gpd.GeoDataFrame(
pd.DataFrame.from_records(
[
db.asdict(row, conversions={"geom_amenity": to_shape})
for row in cells_query
]
),
geometry="geom_amenity",
)
return df_amenities_without_buildings

Expand Down Expand Up @@ -451,9 +456,9 @@ def buildings_with_amenities():
EgonDemandRegioZensusElectricity.sector == "service",
EgonDemandRegioZensusElectricity.scenario == "eGon2035",
)
)
df_amenities_in_buildings = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
).all()
df_amenities_in_buildings = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)

df_amenities_in_buildings["geom_building"] = df_amenities_in_buildings[
Expand Down Expand Up @@ -530,11 +535,16 @@ def buildings_with_amenities():
df_lost_cells["zensus_population_id"]
)
)

df_lost_cells = gpd.read_postgis(
cells_query.statement,
cells_query.session.bind,
geom_col="geom",
cells_query = cells_query.all()

df_lost_cells = gpd.GeoDataFrame(
pd.DataFrame.from_records(
[
db.asdict(row, conversions={"geom": to_shape})
for row in cells_query
]
),
geometry="geom",
)

# place random amenity in cell
Expand Down Expand Up @@ -678,13 +688,18 @@ def buildings_without_amenities():
q_cts_without_amenities
)
)
cells_query = cells_query.all()

# df_buildings_without_amenities = pd.read_sql(
# cells_query.statement, cells_query.session.bind, index_col=None)
df_buildings_without_amenities = gpd.read_postgis(
cells_query.statement,
cells_query.session.bind,
geom_col="geom_building",
df_buildings_without_amenities = gpd.GeoDataFrame(
pd.DataFrame.from_records(
[
db.asdict(row, conversions={"geom_building": to_shape})
for row in cells_query
]
),
geometry="geom_building",
)

df_buildings_without_amenities = df_buildings_without_amenities.rename(
Expand Down Expand Up @@ -772,13 +787,17 @@ def cells_with_cts_demand_only(df_buildings_without_amenities):
EgonDemandRegioZensusElectricity.zensus_population_id
== DestatisZensusPopulationPerHa.id
)
.all()
)

df_cts_cell_without_amenities = gpd.read_postgis(
cells_query.statement,
cells_query.session.bind,
geom_col="geom",
index_col=None,
df_cts_cell_without_amenities = gpd.GeoDataFrame(
pd.DataFrame.from_records(
[
db.asdict(row, conversions={"geom": to_shape})
for row in cells_query
]
),
geometry="geom",
)

# TODO remove after #722
Expand Down Expand Up @@ -829,6 +848,7 @@ def calc_census_cell_share(scenario, sector):
EgonDemandRegioZensusElectricity.zensus_population_id
== MapZensusGridDistricts.zensus_population_id
)
.all()
)

elif sector == "heat":
Expand All @@ -841,12 +861,12 @@ def calc_census_cell_share(scenario, sector):
EgonPetaHeat.zensus_population_id
== MapZensusGridDistricts.zensus_population_id
)
.all()
)

df_demand = pd.read_sql(
cells_query.statement,
cells_query.session.bind,
index_col="zensus_population_id",
df_demand = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query],
index="zensus_population_id",
)

# get demand share of cell per bus
Expand Down Expand Up @@ -992,23 +1012,24 @@ def calc_cts_building_profiles(
egon_building_ids
)
)
.all()
)

df_demand_share = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
df_demand_share = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)

# Get substation cts electricity load profiles of selected bus_ids
with db.session_scope() as session:
cells_query = (
session.query(EgonEtragoElectricityCts).filter(
EgonEtragoElectricityCts.scn_name == scenario
)
).filter(EgonEtragoElectricityCts.bus_id.in_(bus_ids))
session.query(EgonEtragoElectricityCts)
.filter(EgonEtragoElectricityCts.scn_name == scenario)
.filter(EgonEtragoElectricityCts.bus_id.in_(bus_ids))
.all()
)

df_cts_profiles = pd.read_sql(
cells_query.statement,
cells_query.session.bind,
df_cts_profiles = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
df_cts_profiles = pd.DataFrame.from_dict(
df_cts_profiles.set_index("bus_id")["p_set"].to_dict(),
Expand All @@ -1029,23 +1050,24 @@ def calc_cts_building_profiles(
egon_building_ids
)
)
.all()
)

df_demand_share = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
df_demand_share = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)

# Get substation cts heat load profiles of selected bus_ids
with db.session_scope() as session:
cells_query = (
session.query(EgonEtragoHeatCts).filter(
EgonEtragoHeatCts.scn_name == scenario
)
).filter(EgonEtragoHeatCts.bus_id.in_(bus_ids))
session.query(EgonEtragoHeatCts)
.filter(EgonEtragoHeatCts.scn_name == scenario)
.filter(EgonEtragoHeatCts.bus_id.in_(bus_ids))
.all()
)

df_cts_profiles = pd.read_sql(
cells_query.statement,
cells_query.session.bind,
df_cts_profiles = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
df_cts_profiles = pd.DataFrame.from_dict(
df_cts_profiles.set_index("bus_id")["p_set"].to_dict(),
Expand Down Expand Up @@ -1097,12 +1119,10 @@ def remove_double_bus_id(df_cts_buildings):
cells_query = session.query(
MapZensusGridDistricts.zensus_population_id,
MapZensusGridDistricts.bus_id,
)
).all()

df_egon_map_zensus_buildings_buses = pd.read_sql(
cells_query.statement,
cells_query.session.bind,
index_col=None,
df_egon_map_zensus_buildings_buses = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
df_cts_buildings = pd.merge(
left=df_cts_buildings,
Expand Down Expand Up @@ -1330,10 +1350,10 @@ def cts_electricity():
"""
log.info("Start logging!")
with db.session_scope() as session:
cells_query = session.query(CtsBuildings)
cells_query = session.query(CtsBuildings).all()

df_cts_buildings = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
df_cts_buildings = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
log.info("CTS buildings from DB imported!")
df_demand_share_2035 = calc_building_demand_profile_share(
Expand Down Expand Up @@ -1369,10 +1389,10 @@ def cts_heat():
"""
log.info("Start logging!")
with db.session_scope() as session:
cells_query = session.query(CtsBuildings)
cells_query = session.query(CtsBuildings).all()

df_cts_buildings = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
df_cts_buildings = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
log.info("CTS buildings from DB imported!")

Expand Down Expand Up @@ -1425,19 +1445,20 @@ def get_cts_electricity_peak_load():
).filter(
EgonCtsElectricityDemandBuildingShare.scenario == scenario
)
cells_query = cells_query.all()

df_demand_share = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
df_demand_share = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)

with db.session_scope() as session:
cells_query = session.query(EgonEtragoElectricityCts).filter(
EgonEtragoElectricityCts.scn_name == scenario
)
cells_query = cells_query.all()

df_cts_profiles = pd.read_sql(
cells_query.statement,
cells_query.session.bind,
df_cts_profiles = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
df_cts_profiles = pd.DataFrame.from_dict(
df_cts_profiles.set_index("bus_id")["p_set"].to_dict(),
Expand Down Expand Up @@ -1498,20 +1519,21 @@ def get_cts_heat_peak_load():
).filter(
EgonCtsElectricityDemandBuildingShare.scenario == scenario
)
cells_query = cells_query.all()

df_demand_share = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
df_demand_share = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
log.info(f"Retrieved demand share for scenario: {scenario}")

with db.session_scope() as session:
cells_query = session.query(EgonEtragoHeatCts).filter(
EgonEtragoHeatCts.scn_name == scenario
)
cells_query = cells_query.all()

df_cts_profiles = pd.read_sql(
cells_query.statement,
cells_query.session.bind,
df_cts_profiles = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)
log.info(f"Retrieved substation profiles for scenario: {scenario}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,11 @@ def match_osm_and_zensus_data(
cells_query = session.query(
egon_destatis_building_count.c.zensus_population_id,
egon_destatis_building_count.c.building_count,
)
).all()

egon_destatis_building_count = pd.read_sql(
cells_query.statement,
cells_query.session.bind,
index_col="zensus_population_id",
egon_destatis_building_count = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query],
index="zensus_population_id",
)
egon_destatis_building_count = egon_destatis_building_count.dropna()

Expand Down Expand Up @@ -649,10 +648,11 @@ def get_building_peak_loads():
== HouseholdElectricityProfilesInCensusCells.cell_id
)
.order_by(HouseholdElectricityProfilesOfBuildings.id)
.all()
)

df_buildings_and_profiles = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col="id"
df_buildings_and_profiles = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query], index="id"
)

# Read demand profiles from egon-data-bundle
Expand Down Expand Up @@ -756,15 +756,17 @@ def map_houseprofiles_to_buildings():

with db.session_scope() as session:
cells_query = session.query(egon_map_zensus_buildings_residential)
egon_map_zensus_buildings_residential = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
cells_query = cells_query.all()
egon_map_zensus_buildings_residential = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query]
)

with db.session_scope() as session:
cells_query = session.query(HouseholdElectricityProfilesInCensusCells)
egon_hh_profile_in_zensus_cell = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
) # index_col="cell_id")
cells_query = cells_query.all()
egon_hh_profile_in_zensus_cell = pd.DataFrame.from_records(
[db.asdict(row) for row in cells_query] # , index="cell_id")
)

# Match OSM and zensus data to define missing buildings
missing_buildings = match_osm_and_zensus_data(
Expand Down
Loading

0 comments on commit 67111cb

Please sign in to comment.