Skip to content

Commit

Permalink
Merge pull request #584 from Open-Earth-Foundation/feat_ipcc_waste_em…
Browse files Browse the repository at this point in the history
…issionfactor

feat added pipeline for waste default values
  • Loading branch information
amanda-eames authored Jul 22, 2024
2 parents 4245e4d + 23021d7 commit 386a78e
Show file tree
Hide file tree
Showing 11 changed files with 576 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
schema_name = 'raw_data' # Specify the name of the schema to export data to
table_name = 'ipcc_waste_emissionsfactor' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
61 changes: 61 additions & 0 deletions global-api/importer-mage/cc-mage/data_loaders/load_s3_ipcc_ef.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.data_preparation.shared.secrets import get_secret_value
from mage_ai.io.s3 import S3
from mage_ai.io.duckdb import DuckDB
from os import path
import boto3
import duckdb
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@data_loader
def load_from_s3_bucket(*args, **kwargs):
"""
load excel document in s3
"""
aws_access_key_id = get_secret_value('AWS_ACCESS_KEY_ID')
aws_secret_access_key = get_secret_value('AWS_SECRET_ACCESS_KEY')
aws_region = 'us-east-1'

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region
)

s3_filename = 's3://global-api-raw-data/ipcc/EFDB_output.xlsx'

conn = duckdb.connect()

query = f"""
install spatial;
load spatial;
SET s3_region='{aws_region}';
SET s3_access_key_id='{aws_access_key_id}';
SET s3_secret_access_key='{aws_secret_access_key}';
CREATE OR REPLACE TABLE ipcc_emissionfactor as
SELECT * FROM st_read('{s3_filename}');
"""

duckdb_loader = DuckDB.with_config(ConfigFileLoader(config_path, config_profile))
results = duckdb_loader.execute(query)

return 1



@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
select gpc_sector,
gas_name,
gas,
parameter_code,
gpc_subsector,
technical_reference_year,
parameter_subcategory_type1,
parameter_subcategory_typename1,
parameter_subcategory_type2,
parameter_subcategory_typename2,
emissionsfactor_value,
emissionsfactor_units,
region,
data_source,
ef_id as ipcc_ef_id,
rank() over(partition by parameter_code,parameter_subcategory_type1, parameter_subcategory_typename1,parameter_subcategory_type2, parameter_subcategory_typename2
order by technical_reference_year desc) as rnk
from waste_default_values
where parameter_code is not null
and region is not null
order by parameter_code,parameter_subcategory_type1, parameter_subcategory_typename1,parameter_subcategory_type2, parameter_subcategory_typename2, technical_reference_year desc;
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
blocks:
- all_upstream_blocks_executed: true
color: null
configuration: {}
downstream_blocks:
- waste_default_values
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: load_s3_ipcc_ef
retry_config: null
status: updated
timeout: null
type: data_loader
upstream_blocks: []
uuid: load_s3_ipcc_ef
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks:
- waste_unpackrecord
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_default_values
retry_config: null
status: executed
timeout: null
type: transformer
upstream_blocks:
- load_s3_ipcc_ef
uuid: waste_default_values
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks:
- waste_cleandata
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_unpackrecord
retry_config: null
status: updated
timeout: null
type: transformer
upstream_blocks:
- waste_default_values
uuid: waste_unpackrecord
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks: []
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_cleandata
retry_config: null
status: updated
timeout: null
type: transformer
upstream_blocks:
- waste_unpackrecord
uuid: waste_cleandata
- all_upstream_blocks_executed: true
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks:
- load_waste_emissionfactor
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_duckdb_rawdata
retry_config: null
status: updated
timeout: null
type: data_loader
upstream_blocks: []
uuid: waste_duckdb_rawdata
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
export_write_policy: append
downstream_blocks: []
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: load_waste_emissionfactor
retry_config: null
status: executed
timeout: null
type: data_exporter
upstream_blocks:
- waste_duckdb_rawdata
uuid: load_waste_emissionfactor
cache_block_output_in_memory: false
callbacks: []
concurrency_config: {}
conditionals: []
created_at: '2024-07-22 11:07:36.325251+00:00'
data_integration: null
description: null
executor_config: {}
executor_count: 1
executor_type: null
extensions: {}
name: ipcc_emissions_factor
notification_config: {}
remote_variables_dir: null
retry_config: {}
run_pipeline_in_one_process: false
settings:
triggers: null
spark_config: {}
tags: []
type: python
uuid: ipcc_emissions_factor
variables:
bucket_name: global-api-raw-data
variables_dir: /home/src/mage_data/cc-mage
widgets: []
Loading

0 comments on commit 386a78e

Please sign in to comment.