-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
…ocally Feature/#339 dp pre processing locally
- Loading branch information
Showing
51 changed files
with
1,567 additions
and
825 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
FROM postgres:11 | ||
|
||
RUN apt-get update | ||
RUN apt-get install -y postgresql-11-postgis-2.5 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
version: '3.5' | ||
services: | ||
data_processing_db: | ||
image: postgres:11-postgis | ||
container_name: data_processing | ||
restart: unless-stopped | ||
build: | ||
context: . | ||
dockerfile: Dockerfile.postgis | ||
shm_size: '1gb' | ||
ports: | ||
- "127.0.0.1:54321:5432" | ||
environment: | ||
POSTGRES_DB: dp | ||
POSTGRES_USER: oeuser | ||
POSTGRES_PASSWORD: egon | ||
volumes: | ||
- $HOME/docker/volumes/postgres/data_processing:/var/lib/postgresql/data | ||
- ./utilities/entrypoints:/docker-entrypoint-initdb.d/ |
Large diffs are not rendered by default.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import os | ||
import zipfile | ||
import geopandas as gpd | ||
import sqlalchemy | ||
from geoalchemy2 import Geometry | ||
import time | ||
from dataprocessing.tools import metadata | ||
import json | ||
|
||
|
||
CRS = 4326 | ||
|
||
|
||
# Function to generate WKB hex | ||
def wkb_hexer(line): | ||
return line.wkb_hex | ||
|
||
|
||
def import_cosmoclmgrid(file, path, db, **kwds): | ||
|
||
if not db.dialect.has_schema(db, kwds["schema"]): | ||
db.execute(sqlalchemy.schema.CreateSchema(kwds["schema"])) | ||
|
||
shp_file = file.replace(".zip", ".shp") | ||
subdir = file.replace(".zip", "") | ||
os.makedirs(os.path.join(path, subdir), exist_ok=True) | ||
|
||
zf = zipfile.ZipFile(os.path.join(path, file)) | ||
zf.extractall(path=os.path.join(path, subdir)) | ||
|
||
# read with pandas | ||
cosmoclmgrid = gpd.read_file(os.path.join(path, subdir, shp_file)) | ||
|
||
cosmoclmgrid['geom'] = cosmoclmgrid['geometry'].apply(wkb_hexer) | ||
cosmoclmgrid.drop("geometry", axis=1, inplace=True) | ||
|
||
# create table from geopandas dataframe | ||
cosmoclmgrid.to_sql(kwds["table"], | ||
db, | ||
kwds["schema"], | ||
index=False, | ||
if_exists="replace", | ||
chunksize=10000, | ||
method="multi", | ||
dtype={'geom': Geometry('POLYGON')} | ||
) | ||
|
||
create_pkey_constraint = "ALTER TABLE {schema}.{table} ADD CONSTRAINT " \ | ||
"{table}_pkey PRIMARY KEY (gid);".format(**kwds) | ||
db.execution_options(autocommit=True).execute(create_pkey_constraint) | ||
|
||
# create metadata json str | ||
metadata_dict = {"name": "Spatial grid coastDat-2 re-analysis data set", | ||
"title": "COSMO CLM grid", | ||
"description": "This spatial grid provides reference polygons for coastDat-2 " | ||
"re-analysis data.", | ||
"language": ["EN"], | ||
"spatial": { | ||
"location": "0.22 ° x 0.22 °", | ||
"extent": "Europe", | ||
"resolution": "0.22 ° x 0.22 °" | ||
}, | ||
"temporal": { | ||
"referenceDate": "2014", | ||
"timeseries": { | ||
"start": "", | ||
"end": "", | ||
"resolution": "", | ||
"alignment": "", | ||
"aggregationType": "" | ||
} | ||
}, | ||
"sources": [ | ||
{ | ||
"title": "coastDat-2 re-analysis data set", | ||
"description": "coastDat-2 re-analysis data is a long-term backcasted " | ||
"climate data set based on the regional weather forceasting " | ||
"model COSMO CLM", | ||
"path": "https://www.coastdat.de/about_us/index.php.en", | ||
"licenses": [ | ||
{ | ||
"name": "", | ||
"title": "", | ||
"path": "", | ||
"instruction": "", | ||
"attribution": "" | ||
} | ||
] | ||
} | ||
], | ||
"licenses": [ | ||
{ | ||
"name": "Open Data Commons Open Database License 1.0", | ||
"title": "", | ||
"path": "https://opendatacommons.org/licenses/odbl/1.0/", | ||
"instruction": "You are free: To Share, To Create, To Adapt; As long as you: Attribute, Share-Alike, Keep open!", | ||
"attribution": "© Reiner Lemoine Institut" | ||
} | ||
], | ||
"contributors": [ | ||
{ | ||
"title": "Guido Pleßmann", | ||
"email": "http://github.com/gplssm", | ||
"date": time.strftime("%Y-%m-%d"), | ||
"object": "", | ||
"comment": "Only uploaded the data" | ||
} | ||
], | ||
"metaMetadata": { | ||
"metadataVersion": "OEP-1.4.0", | ||
"metadataLicense": { | ||
"name": "CC0-1.0", | ||
"title": "Creative Commons Zero v1.0 Universal", | ||
"path": "https://creativecommons.org/publicdomain/zero/1.0/" | ||
} | ||
}, | ||
} | ||
|
||
json_str = "'" + json.dumps(metadata_dict) + "'" | ||
|
||
metadata.submit_comment(db, json_str, kwds["schema"], kwds["table"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
""" | ||
eGo PreProcessing (eGoPP) | ||
This script opens an oedb database connection and executes different parts of eGo. | ||
Reads python and SQL scripts and gives logging infos during the execution. | ||
Also see corresponding BPML diagram. | ||
This file is part of project "open_eGo DataProcessing" (https://github.com/openego/data_processing/). | ||
It's copyrighted by the contributors recorded in the version control history: | ||
openego/data_processing/preprocessing/eGo_PreProcessing.py | ||
SPDX-License-Identifier: AGPL-3.0-or-later | ||
""" | ||
|
||
__copyright__ = "Reiner Lemoine Institut" | ||
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)" | ||
__url__ = "https://www.gnu.org/licenses/agpl-3.0.en.html" | ||
__author__ = "gplssm, Ludee" | ||
|
||
import logging | ||
import time | ||
import os | ||
import subprocess | ||
from dataprocessing.tools import io | ||
from egoio.tools import db | ||
from sqlalchemy import create_engine | ||
import yaml | ||
from urllib.request import urlretrieve | ||
import importlib | ||
|
||
# Configure logging | ||
logger = logging.getLogger('EEEE') | ||
logger.setLevel(logging.INFO) | ||
ch = logging.StreamHandler() | ||
ch.setLevel(logging.INFO) | ||
formatter = logging.Formatter('%(asctime)s %(message)s', | ||
datefmt='%Y-%m-%d %I:%M:%S') | ||
ch.setFormatter(formatter) | ||
logger.addHandler(ch) | ||
|
||
SCENARIOLOG = True | ||
DOWNLOADDIR = os.path.join(os.path.expanduser("~"), ".egon-pre-processing-cached/") | ||
FNULL = open(os.devnull, 'w') | ||
|
||
db_info = { | ||
"host": "localhost", | ||
"port": 54321, | ||
"user": "oeuser", | ||
"password": "egon", | ||
"database": "dp" | ||
} | ||
|
||
|
||
def create_data_dir(dir=DOWNLOADDIR): | ||
""" | ||
Parameters | ||
---------- | ||
dir: str | ||
Define alternative dir for chached data | ||
""" | ||
|
||
os.makedirs(dir, exist_ok=True) | ||
logger.info('Cached data directory {} created (or already existed).'.format(dir)) | ||
|
||
|
||
def download_data(url, filename): | ||
|
||
file = os.path.join(DOWNLOADDIR, filename) | ||
|
||
if not os.path.isfile(file): | ||
urlretrieve(url, file) | ||
|
||
|
||
def preprocessing(): | ||
|
||
# get current time and inform about start | ||
total_time = time.time() | ||
logger.info('ego preprocessing started...') | ||
|
||
# create directory for cached downloaded data | ||
create_data_dir() | ||
|
||
# list of sql- and python-snippets that process the data in correct order | ||
script_dir = os.path.abspath( | ||
os.path.join(os.path.dirname(__file__))) | ||
|
||
snippets = [ | ||
'ego_pre_voltage_level.sql', | ||
'ego_pre_slp_parameters.sql' | ||
] | ||
datasets = yaml.load(open("import.yml"), Loader=yaml.SafeLoader) | ||
|
||
# get database connection | ||
conn_oedb = db.connection(readonly=True).connect() | ||
# engine_local = create_engine('postgresql+psycopg2://oeuser:egon@localhost:54321/dp') | ||
engine_local = create_engine('postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}'.format(**db_info)) | ||
conn = engine_local.connect() | ||
|
||
# iterate over data sets | ||
for key, dataset in datasets.items(): | ||
for download in dataset.get("required_data", []): | ||
logger.info("Downloading '{}' ...".format(download["filename"])) | ||
snippet_time = time.time() | ||
download_data(download["url"], download["filename"]) | ||
logger.info('...successfully done in {:.2f} seconds.'.format( | ||
time.time() - snippet_time)) | ||
|
||
for script in dataset.get("scripts", []): | ||
|
||
# timing and logging | ||
snippet_time = time.time() | ||
logger.info("Execute '{}' ...".format(script["script"])) | ||
if script["language"] == 'SQL': | ||
if SCENARIOLOG is True: | ||
snippet_str = open(os.path.join(script_dir, key, script["script"])).read() | ||
elif SCENARIOLOG is False: | ||
snippet_str = "".join( | ||
[l for l in open(os.path.join(script_dir, key, script["script"])).readlines() | ||
if not l.startswith("SELECT scenario_log") and not l.startswith("SELECT ego_scenario_log")]) | ||
|
||
# execute desired sql snippet | ||
conn.execution_options(autocommit=True).execute(snippet_str) | ||
elif script["language"] == 'python': | ||
if len(script["script"].split("::")) > 1: | ||
module, func = script["script"].split("::") | ||
|
||
args = script.get("args", {}) | ||
|
||
if script["db"] == "info": | ||
db_details = db_info | ||
elif script["db"] == "conn": | ||
db_details = engine_local | ||
else: | ||
"" | ||
|
||
mod = importlib.import_module(module) | ||
func_to_call = getattr(mod, func) | ||
func_to_call(script["filename"], DOWNLOADDIR, db_details, **args) | ||
else: | ||
filename = os.path.join(script_dir, key, script["script"]) | ||
script_str = open(filename, "rb").read() | ||
|
||
# execute desired sql snippet | ||
exec(compile(script_str, filename, 'exec')) | ||
elif script["language"] == 'bash': | ||
filename = os.path.join(script_dir, key, script["script"]) | ||
script_str = script["script"] | ||
|
||
if "filename" in script: | ||
script_str += script["filename"] | ||
|
||
# execute desired bash script | ||
# rc (script_str) | ||
subprocess.run(script_str, | ||
shell=True, | ||
env={"PGPASSWORD": "egon"}, | ||
stdout=FNULL, | ||
cwd= os.path.join(script_dir, key)) | ||
else: | ||
raise NameError('{} is neither a python nor a sql script (at least it ' | ||
'has not the right extension). Please add an extension ' | ||
'to the script name (.py or .sql)'.format(script["script"])) | ||
|
||
# inform the user | ||
logger.info('...successfully done in {:.2f} seconds.'.format( | ||
time.time() - snippet_time)) | ||
|
||
# close database connection | ||
conn.close() | ||
|
||
logger.info('eGo PreProcessing script successfully executed in {:.2f} seconds'.format( | ||
time.time() - total_time)) | ||
|
||
|
||
if __name__ == '__main__': | ||
preprocessing() |
Oops, something went wrong.