Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat added pipeline for waste default values #584

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
schema_name = 'raw_data' # Specify the name of the schema to export data to
table_name = 'ipcc_waste_emissionsfactor' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
61 changes: 61 additions & 0 deletions global-api/importer-mage/cc-mage/data_loaders/load_s3_ipcc_ef.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.data_preparation.shared.secrets import get_secret_value
from mage_ai.io.s3 import S3
from mage_ai.io.duckdb import DuckDB
from os import path
import boto3
import duckdb
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@data_loader
def load_from_s3_bucket(*args, **kwargs):
"""
load excel document in s3
"""
aws_access_key_id = get_secret_value('AWS_ACCESS_KEY_ID')
aws_secret_access_key = get_secret_value('AWS_SECRET_ACCESS_KEY')
aws_region = 'us-east-1'

config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region
)

s3_filename = 's3://global-api-raw-data/ipcc/EFDB_output.xlsx'

conn = duckdb.connect()

query = f"""
install spatial;
load spatial;

SET s3_region='{aws_region}';
SET s3_access_key_id='{aws_access_key_id}';
SET s3_secret_access_key='{aws_secret_access_key}';

CREATE OR REPLACE TABLE ipcc_emissionfactor as
SELECT * FROM st_read('{s3_filename}');
"""

duckdb_loader = DuckDB.with_config(ConfigFileLoader(config_path, config_profile))
results = duckdb_loader.execute(query)

return 1



@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
select gpc_sector,
gas_name,
gas,
parameter_code,
gpc_subsector,
technical_reference_year,
parameter_subcategory_type1,
parameter_subcategory_typename1,
parameter_subcategory_type2,
parameter_subcategory_typename2,
emissionsfactor_value,
emissionsfactor_units,
region,
data_source,
ef_id as ipcc_ef_id,
rank() over(partition by parameter_code,parameter_subcategory_type1, parameter_subcategory_typename1,parameter_subcategory_type2, parameter_subcategory_typename2
order by technical_reference_year desc) as rnk
from waste_default_values
where parameter_code is not null
and region is not null
order by parameter_code,parameter_subcategory_type1, parameter_subcategory_typename1,parameter_subcategory_type2, parameter_subcategory_typename2, technical_reference_year desc;
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
blocks:
- all_upstream_blocks_executed: true
color: null
configuration: {}
downstream_blocks:
- waste_default_values
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: load_s3_ipcc_ef
retry_config: null
status: updated
timeout: null
type: data_loader
upstream_blocks: []
uuid: load_s3_ipcc_ef
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks:
- waste_unpackrecord
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_default_values
retry_config: null
status: executed
timeout: null
type: transformer
upstream_blocks:
- load_s3_ipcc_ef
uuid: waste_default_values
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks:
- waste_cleandata
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_unpackrecord
retry_config: null
status: updated
timeout: null
type: transformer
upstream_blocks:
- waste_default_values
uuid: waste_unpackrecord
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks: []
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_cleandata
retry_config: null
status: updated
timeout: null
type: transformer
upstream_blocks:
- waste_unpackrecord
uuid: waste_cleandata
- all_upstream_blocks_executed: true
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks:
- load_waste_emissionfactor
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: waste_duckdb_rawdata
retry_config: null
status: updated
timeout: null
type: data_loader
upstream_blocks: []
uuid: waste_duckdb_rawdata
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: duckdb
data_provider_profile: default
export_write_policy: append
downstream_blocks: []
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: load_waste_emissionfactor
retry_config: null
status: executed
timeout: null
type: data_exporter
upstream_blocks:
- waste_duckdb_rawdata
uuid: load_waste_emissionfactor
cache_block_output_in_memory: false
callbacks: []
concurrency_config: {}
conditionals: []
created_at: '2024-07-22 11:07:36.325251+00:00'
data_integration: null
description: null
executor_config: {}
executor_count: 1
executor_type: null
extensions: {}
name: ipcc_emissions_factor
notification_config: {}
remote_variables_dir: null
retry_config: {}
run_pipeline_in_one_process: false
settings:
triggers: null
spark_config: {}
tags: []
type: python
uuid: ipcc_emissions_factor
variables:
bucket_name: global-api-raw-data
variables_dir: /home/src/mage_data/cc-mage
widgets: []
Loading
Loading