-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucket_manifest_pipeline.py
87 lines (72 loc) · 3.02 KB
/
bucket_manifest_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from bucket_manifest.bucket import get_bucket_manifest, compute_md5
from bucket_manifest.pub import pub
logging.basicConfig(level=logging.INFO)
FILE_HEADERS = ["bucket", "key", "size", "md5"]
class ComputeMD5(beam.DoFn):
def __init__(self, project_id, pub_topic):
self.project_id = project_id
self.pub_topic = pub_topic
self._buffer = []
def process(self, element):
"""
"""
text_line = element.strip()
words = text_line.split("\t")
fi = dict(zip(FILE_HEADERS, words))
fi["size"] = int(fi["size"])
try:
fi["md5"] = compute_md5(fi.get("bucket"), fi.get("key"))
except Exception as e:
# store the error message
fi["md5"] = str(e)
my_str = json.dumps(fi)
logging.info("publish message: {}".format(my_str))
pub(self.project_id.get(), self.pub_topic.get(), str.encode(my_str))
class BucketManifestOptions(PipelineOptions):
"""
Runtime Parameters given during template execution
bucket, pub_sub and output parameters are necessary for execution of pipeline
"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--bucket", type=str, help="Path of the file to read from")
parser.add_value_provider_argument(
"--project_id", type=str, help="project_id topic"
)
parser.add_value_provider_argument("--pub_topic", type=str, help="pubsub topic")
parser.add_value_provider_argument("--output", type=str, help="output")
def run(argv=None):
"""
Pipeline entry point, runs the all the necessary processes
"""
# Initialize runtime parameters as object
pipeline_options = PipelineOptions()
# Save main session state so pickled functions and classes
# defined in __main__ can be unpickled
pipeline_options.view_as(SetupOptions).save_main_session = True
# Beginning of the pipeline
p = beam.Pipeline(options=pipeline_options)
# Runtime Parameters given during template execution
bucket_manifest_options = pipeline_options.view_as(BucketManifestOptions)
# Get bucket objects
blob_list = get_bucket_manifest(bucket_manifest_options.bucket)
# pipeline setup
lines = p | beam.Create(blob_list)
lines | "compute_md5" >> beam.ParDo(
ComputeMD5(
bucket_manifest_options.project_id, bucket_manifest_options.pub_topic
)
)
# Run the pipeline
prog = p.run()
prog.wait_until_finish()
# python bucket_manifest_pipeline.py --runner DataflowRunner --project dcf-integration --staging_location gs://dcf-dataflow-bucket/staging --temp_location gs://dcf-dataflow-bucket/temp --region us-east1 --output gs://dcf-dataflow-bucket/output --setup_file ./setup.py
if __name__ == "__main__":
logger = logging.getLogger().setLevel(logging.INFO)
run()