-
Notifications
You must be signed in to change notification settings - Fork 97
/
Copy pathmain.py
162 lines (132 loc) · 6.78 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Any, Type
import logging
import sys
from pyspark.sql import SparkSession
from dataproc_templates import BaseTemplate, TemplateName
from dataproc_templates.gcs.gcs_to_jdbc import GCSToJDBCTemplate
from dataproc_templates.mongo.mongo_to_gcs import MongoToGCSTemplate
from dataproc_templates.mongo.mongo_to_bq import MongoToBigQueryTemplate
from dataproc_templates.util import get_template_name, get_log_level, track_template_invocation
from dataproc_templates.gcs.gcs_to_bigquery import GCSToBigQueryTemplate
from dataproc_templates.gcs.gcs_to_gcs import GCSToGCSTemplate
from dataproc_templates.gcs.gcs_to_mongo import GCSToMONGOTemplate
from dataproc_templates.gcs.gcs_to_bigtable import GCSToBigTableTemplate
from dataproc_templates.bigquery.bigquery_to_gcs import BigQueryToGCSTemplate
from dataproc_templates.bigquery.bigquery_to_memorystore import BigQueryToMemorystoreTemplate
from dataproc_templates.hive.hive_to_bigquery import HiveToBigQueryTemplate
from dataproc_templates.hive.hive_to_gcs import HiveToGCSTemplate
from dataproc_templates.gcs.text_to_bigquery import TextToBigQueryTemplate
from dataproc_templates.hbase.hbase_to_gcs import HbaseToGCSTemplate
from dataproc_templates.jdbc.jdbc_to_jdbc import JDBCToJDBCTemplate
from dataproc_templates.jdbc.jdbc_to_gcs import JDBCToGCSTemplate
from dataproc_templates.jdbc.jdbc_to_bigquery import JDBCToBigQueryTemplate
from dataproc_templates.snowflake.snowflake_to_gcs import SnowflakeToGCSTemplate
from dataproc_templates.redshift.redshift_to_gcs import RedshiftToGCSTemplate
from dataproc_templates.cassandra.cassandra_to_bigquery import CassandraToBQTemplate
from dataproc_templates.hive.util.hive_ddl_extractor import HiveDDLExtractorTemplate
from dataproc_templates.kafka.kafka_to_gcs import KafkaToGCSTemplate
from dataproc_templates.kafka.kafka_to_bq import KafkaToBigQueryTemplate
from dataproc_templates.s3.s3_to_bigquery import S3ToBigQueryTemplate
from dataproc_templates.cassandra.cassandra_to_gcs import CassandraToGCSTemplate
from dataproc_templates.pubsublite.pubsublite_to_gcs import PubSubLiteToGCSTemplate
from dataproc_templates.azure.azure_blob_storage_to_bigquery import AzureBlobStorageToBigQueryTemplate
from dataproc_templates.pubsublite.pubsublite_to_bigtable import PubSubLiteToBigtableTemplate
from dataproc_templates.elasticsearch.elasticsearch_to_gcs import ElasticsearchToGCSTemplate
from dataproc_templates.elasticsearch.elasticsearch_to_bq import ElasticsearchToBQTemplate
from dataproc_templates.elasticsearch.elasticsearch_to_bigtable import ElasticsearchToBigTableTemplate
LOGGER: logging.Logger = logging.getLogger('dataproc_templates')
# Maps each TemplateName to its corresponding implementation
# of BaseTemplate
TEMPLATE_IMPLS: Dict[TemplateName, Type[BaseTemplate]] = {
TemplateName.GCSTOBIGQUERY: GCSToBigQueryTemplate,
TemplateName.GCSTOGCS: GCSToGCSTemplate,
TemplateName.GCSTOBIGTABLE: GCSToBigTableTemplate,
TemplateName.BIGQUERYTOGCS: BigQueryToGCSTemplate,
TemplateName.BIGQUERYTOMEMORYSTORE: BigQueryToMemorystoreTemplate,
TemplateName.HIVETOBIGQUERY: HiveToBigQueryTemplate,
TemplateName.HIVETOGCS: HiveToGCSTemplate,
TemplateName.TEXTTOBIGQUERY: TextToBigQueryTemplate,
TemplateName.GCSTOJDBC: GCSToJDBCTemplate,
TemplateName.GCSTOMONGO: GCSToMONGOTemplate,
TemplateName.HBASETOGCS: HbaseToGCSTemplate,
TemplateName.JDBCTOJDBC: JDBCToJDBCTemplate,
TemplateName.JDBCTOGCS: JDBCToGCSTemplate,
TemplateName.JDBCTOBIGQUERY: JDBCToBigQueryTemplate,
TemplateName.MONGOTOGCS: MongoToGCSTemplate,
TemplateName.MONGOTOBIGQUERY: MongoToBigQueryTemplate,
TemplateName.SNOWFLAKETOGCS: SnowflakeToGCSTemplate,
TemplateName.REDSHIFTTOGCS: RedshiftToGCSTemplate,
TemplateName.CASSANDRATOBQ: CassandraToBQTemplate,
TemplateName.AZUREBLOBSTORAGETOBQ: AzureBlobStorageToBigQueryTemplate,
TemplateName.CASSANDRATOGCS: CassandraToGCSTemplate,
TemplateName.HIVEDDLEXTRACTOR: HiveDDLExtractorTemplate,
TemplateName.KAFKATOGCS: KafkaToGCSTemplate,
TemplateName.KAFKATOBQ: KafkaToBigQueryTemplate,
TemplateName.S3TOBIGQUERY: S3ToBigQueryTemplate,
TemplateName.PUBSUBLITETOGCS: PubSubLiteToGCSTemplate,
TemplateName.PUBSUBLITETOBIGTABLE: PubSubLiteToBigtableTemplate,
TemplateName.ELASTICSEARCHTOGCS: ElasticsearchToGCSTemplate,
TemplateName.ELASTICSEARCHTOBQ: ElasticsearchToBQTemplate,
TemplateName.ELASTICSEARCHTOBIGTABLE: ElasticsearchToBigTableTemplate
}
def create_spark_session(template_name: TemplateName) -> SparkSession:
"""
Creates the SparkSession object.
It also sets the Spark logging level to info. We could
consider parametrizing the log level in the future.
Args:
template_name (str): The name of the template being
run. Used to set the Spark app name.
Returns:
pyspark.sql.SparkSession: The set up SparkSession.
"""
spark = SparkSession.builder \
.appName(template_name.value) \
.enableHiveSupport() \
.getOrCreate()
log4j = spark.sparkContext._jvm.org.apache.log4j
log4j_level: log4j.Level = log4j.Level.toLevel(get_log_level())
log4j.LogManager.getRootLogger().setLevel(log4j_level)
log4j.LogManager.getLogger("org.apache.spark").setLevel(log4j_level)
spark.sparkContext.setLogLevel(get_log_level())
return spark
def run_template(template_name: TemplateName) -> None:
"""
Executes a template given it's template name.
Args:
template_name (TemplateName): The TemplateName of the template
that should be run.
Returns:
None
"""
# pylint: disable=broad-except
template_impl: Type[BaseTemplate] = TEMPLATE_IMPLS[template_name]
template_instance: BaseTemplate = template_impl.build()
try:
args: Dict[str, Any] = template_instance.parse_args()
spark: SparkSession = create_spark_session(template_name=template_name)
track_template_invocation(spark=spark, template_name=template_name)
template_instance.run(spark=spark, args=args)
except Exception:
LOGGER.exception(
'An error occurred while running %s template',
template_name
)
sys.exit(1)
if __name__ == '__main__':
run_template(
template_name=get_template_name()
)