Skip to content

Commit

Permalink
Merge pull request apache#6226 from qinyeli/master
Browse files Browse the repository at this point in the history
Interactive Beam -- unblocking integration with PortableRunner
  • Loading branch information
charlesccychen authored Aug 15, 2018
2 parents f001db9 + dbae9c8 commit a7ebcf3
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 21 deletions.
18 changes: 12 additions & 6 deletions sdks/python/apache_beam/runners/interactive/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import print_function

import collections
import datetime
import os
import tempfile
import urllib
Expand Down Expand Up @@ -80,9 +81,14 @@ def cleanup(self):
class FileBasedCacheManager(CacheManager):
"""Maps PCollections to local temp files for materialization."""

def __init__(self, temp_dir=None):
self._temp_dir = temp_dir or tempfile.mkdtemp(
prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
def __init__(self, cache_dir=None):
if cache_dir:
self._cache_dir = filesystems.FileSystems.join(
cache_dir,
datetime.datetime.now().strftime("cache-%y-%m-%d-%H:%M:%S"))
else:
self._cache_dir = tempfile.mkdtemp(
prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
self._versions = collections.defaultdict(lambda: self._CacheVersion())

def exists(self, *labels):
Expand Down Expand Up @@ -116,14 +122,14 @@ def sink(self, *labels):
coder=SafeFastPrimitivesCoder())._sink

def cleanup(self):
if filesystems.FileSystems.exists(self._temp_dir):
filesystems.FileSystems.delete([self._temp_dir])
if filesystems.FileSystems.exists(self._cache_dir):
filesystems.FileSystems.delete([self._cache_dir])

def _glob_path(self, *labels):
return self._path(*labels) + '-*-of-*'

def _path(self, *labels):
return filesystems.FileSystems.join(self._temp_dir, *labels)
return filesystems.FileSystems.join(self._cache_dir, *labels)

def _match(self, *labels):
match = filesystems.FileSystems.match([self._glob_path(*labels)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def tearDown(self):

def mock_write_cache(self, pcoll_list, prefix, cache_label):
"""Cache the PCollection where cache.WriteCache would write to."""
cache_path = filesystems.FileSystems.join(self.test_dir, prefix)
cache_path = filesystems.FileSystems.join(
self.cache_manager._cache_dir, prefix)
if not filesystems.FileSystems.exists(cache_path):
filesystems.FileSystems.mkdirs(cache_path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ class InteractiveRunner(runners.PipelineRunner):
"""

def __init__(self, underlying_runner=None, cache_dir=None):
# TODO(qinyeli, BEAM-4755) remove explicitly overriding underlying runner
# once interactive_runner works with FnAPI mode
self._underlying_runner = (underlying_runner
or direct_runner.BundleBasedDirectRunner())
or direct_runner.DirectRunner())
self._cache_manager = cache.FileBasedCacheManager(cache_dir)
self._in_session = False

Expand Down Expand Up @@ -103,7 +101,11 @@ def run_pipeline(self, pipeline):
analyzer = pipeline_analyzer.PipelineAnalyzer(self._cache_manager,
pipeline_proto,
self._underlying_runner,
pipeline._options,
self._desired_cache_labels)
# Should be only accessed for debugging purpose.
self._analyzer = analyzer

pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
analyzer.pipeline_proto_to_execute(),
self._underlying_runner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ def printer(elem):
class InteractiveRunnerTest(unittest.TestCase):

def test_basic(self):
# TODO(qinyeli, BEAM-4755) remove explicitly overriding underlying runner
# once interactive_runner works with FnAPI mode
p = beam.Pipeline(
runner=interactive_runner.InteractiveRunner(
direct_runner.BundleBasedDirectRunner()))
direct_runner.DirectRunner()))
p.run().wait_until_finish()
pc0 = (
p | 'read' >> beam.Create([1, 2, 3])
Expand All @@ -68,11 +66,9 @@ def process(self, element):
words = text_line.split()
return words

# TODO(qinyeli, BEAM-4755) remove explicitly overriding underlying runner
# once interactive_runner works with FnAPI mode
p = beam.Pipeline(
runner=interactive_runner.InteractiveRunner(
direct_runner.BundleBasedDirectRunner()))
direct_runner.DirectRunner()))

# Count the occurrences of each word.
counts = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,25 @@

class PipelineAnalyzer(object):
def __init__(self, cache_manager, pipeline_proto, underlying_runner,
desired_cache_labels=None):
options=None, desired_cache_labels=None):
"""Constructor of PipelineAnanlyzer.
Args:
cache_manager: (CacheManager)
pipeline_proto: (Pipeline proto)
underlying_runner: (PipelineRunner)
options: (PipelineOptions)
desired_cache_labels: (Set[str]) a set of labels of the PCollection
queried by the user.
"""
self._cache_manager = cache_manager
self._pipeline_proto = pipeline_proto
self._underlying_runner = underlying_runner
self._desired_cache_labels = desired_cache_labels or []

self._pipeline = beam.pipeline.Pipeline.from_runner_api(
self._pipeline_proto,
self._underlying_runner,
options=None)
runner=underlying_runner,
options=options)
# context returned from to_runner_api is more informative than that returned
# from from_runner_api.
_, self._context = self._pipeline.to_runner_api(return_context=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def to_stable_runner_api(p):
class PipelineAnalyzerTest(unittest.TestCase):

def setUp(self):
self.runner = direct_runner.BundleBasedDirectRunner()
self.runner = direct_runner.DirectRunner()
self.cache_manager = cache.FileBasedCacheManager()

def tearDown(self):
Expand Down

0 comments on commit a7ebcf3

Please sign in to comment.