Skip to content

Commit

Permalink
doc: add documentation on how to load data into redshift
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Vito Moscaritolo <[email protected]>
  • Loading branch information
mavimo committed Jan 26, 2023
1 parent 3d7e134 commit 966fe1f
Showing 1 changed file with 144 additions and 0 deletions.
144 changes: 144 additions & 0 deletions databuilder/example/scripts/sample_redshift_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""

import logging
import sys
import textwrap
import os

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.redshift_metadata_extractor import RedshiftMetadataExtractor
from databuilder.extractor.redshift_table_last_updated_extractor import RedshiftTableLastUpdatedExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]

es = Elasticsearch([
{'host': es_host if es_host else 'localhost'},
])

DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()

NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'

LOGGER = logging.getLogger(__name__)

# todo: connection string needs to change
def connection_string():
user = 'username'
host = 'hostname.aws.com'
port = 5439
db = 'public'
password = 'mypasswors'
return "postgresql://%s:%s@%s:%s/%s" % (user, password, host, port, db)


def run_redshift_table_model_job():
where_clause_suffix = textwrap.dedent("""
WHERE schema = 'myschema'
""")

tmp_folder = os.path.join(os.getcwd(), 'amundsen', 'table_metadata')
node_files_folder = os.path.join(tmp_folder, 'nodes')
relationship_files_folder = os.path.join(tmp_folder, 'relationships')

task = DefaultTask(
extractor=RedshiftMetadataExtractor(),
loader=FsNeo4jCSVLoader(),
transformer=NoopTransformer()
)

job_config = ConfigFactory.from_dict({
f'extractor.redshift_metadata.{RedshiftMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.redshift_metadata.{RedshiftMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': True,
f'extractor.redshift_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}': True,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(
conf=job_config,
task=task,
publisher=Neo4jCsvPublisher()
)
return job


def run_redshift_table_last_updated_job():
where_clause_suffix = textwrap.dedent("""
WHERE schema = 'myschema'
""")

tmp_folder = os.path.join(os.getcwd(), 'amundsen', 'last_updated_data')
node_files_folder = os.path.join(tmp_folder, 'nodes')
relationship_files_folder = os.path.join(tmp_folder, 'relationships')

task = DefaultTask(
extractor=RedshiftTableLastUpdatedExtractor(),
loader=FsNeo4jCSVLoader(),
transformer=NoopTransformer()
)

job_config = ConfigFactory.from_dict({
f'extractor.redshift_table_last_updated.{RedshiftTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.redshift_table_last_updated.{RedshiftTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME}': True,
f'extractor.redshift_table_last_updated.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}': True,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag' # should use unique tag here like {ds}
})
job = DefaultJob(
conf=job_config,
task=task,
publisher=Neo4jCsvPublisher()
)
return job

if __name__ == "__main__":
# Uncomment next line to get INFO level logging
# logging.basicConfig(level=logging.INFO)

loading_job1 = run_redshift_table_model_job()
loading_job1.launch()

loading_job2 = run_redshift_table_last_updated_job()
loading_job2.launch()

0 comments on commit 966fe1f

Please sign in to comment.