Skip to content

Commit

Permalink
Project import generated by Copybara.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 186310133
  • Loading branch information
tf-transform-team authored and zoyahav committed Feb 20, 2018
1 parent cdb30c5 commit 30def1a
Show file tree
Hide file tree
Showing 13 changed files with 370 additions and 287 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Follow either of the two links above to access the appropriate CLA and instructi
### Contributing code

If you have improvements to TensorFlow Transform, send us your pull requests!
For those just getting started, Github has a [howto](https://help.github.com/articles/using-pull-requests/).
For those just getting started, GitHub has a [howto](https://help.github.com/articles/using-pull-requests/).

If you want to contribute but you're not sure where to start, take a look at the
[issues with the "contributions welcome" label](https://github.com/tensorflow/transform/labels/contributions%20welcome).
Expand Down
21 changes: 9 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# tf.Transform
# tf.Transform [![PyPI](https://img.shields.io/pypi/pyversions/tensorflow-transform.svg?style=plastic)](https://github.com/tensorflow/transform)

**tf.Transform** is a library for doing data preprocessing with TensorFlow. It
allows users to combine various data processing frameworks (currently Apache
Beam is supported but tf.Transform can be extended to support other frameworks),
**tf.Transform** is a library for doing data preprocessing with
[TensorFlow](https://www.tensorflow.org). It allows users to combine various
data processing frameworks (currently [Apache Beam](https://beam.apache.org/) is
supported but tf.Transform can be extended to support other frameworks),
with TensorFlow, to transform data. Because tf.Transform is built on TensorFlow,
it allows users to export a graph which re-creates the transformations they did
to their data as a TensorFlow graph. This is important as the user can then
Expand Down Expand Up @@ -39,13 +40,9 @@ tf.Transform does though have a dependency on the GCP distribution of Apache
Beam. Apache Beam is the framework used to run distributed pipelines. Apache
Beam is able to run pipelines in multiple ways, depending on the "runner" used,
and the "runner" is usually provided by a distribution of Apache
Beam. With the GCP distribution of Apache Beam, one can run beam pipelines
locally, or on Google Cloud Dataflow.

Note: If you clone tf.Transform's implementation and samples from GitHub's
`master` branch (as opposed to using the released implementation and samples
from PyPI) they will likely only work with TensorFlow's nightly
[build](https://github.com/tensorflow/tensorflow).
Beam. With the GCP distribution of Apache Beam, one can run Apache Beam
pipelines locally, or on
[Google Cloud Dataflow](https://cloud.google.com/dataflow/).

### Compatible Versions

Expand All @@ -56,7 +53,7 @@ releasing a new version.

|tensorflow-transform |tensorflow |apache-beam[gcp]|
|--------------------------------------------------------------------------------|--------------|----------------|
|[GitHub master](https://github.com/tensorflow/transform/blob/master/RELEASE.md) |nightly (1.x) |2.2.0 |
|[GitHub master](https://github.com/tensorflow/transform/blob/master/RELEASE.md) |nightly (1.x) |2.3.0 |
|[0.4.0](https://github.com/tensorflow/transform/blob/v0.4.0/RELEASE.md) |1.4 |2.2.0 |
|[0.3.1](https://github.com/tensorflow/transform/blob/v0.3.1/RELEASE.md) |1.3 |2.1.1 |
|[0.3.0](https://github.com/tensorflow/transform/blob/v0.3.0/RELEASE.md) |1.3 |2.1.1 |
Expand Down
56 changes: 30 additions & 26 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
# Current version (not yet released; still in development)

## Major Features and Improvements
* Batching of input instances is now done automatically and dynamically.
* Added analyzers to compute covarance matrices (`tft.covariance`) and
principal components for PCA (`tft.pca`).
* CombinerSpec and combine_analyzer now accept multiple inputs/outputs.
* Batching of input instances is now done automatically and dynamically.
* Added analyzers to compute covarance matrices (`tft.covariance`) and
principal components for PCA (`tft.pca`).
* CombinerSpec and combine_analyzer now accept multiple inputs/outputs.

## Bug Fixes and Other Changes

* Fixes a bug where TransformDataset would not return correct output if the
output DatasetMetadata contained deferred values (such as vocabularies).
* Added checks that the prepreprocessing function's outputs all have the same
size in the batch dimension.
* Added `tft.apply_buckets` which takes an input tensor and a list of bucket
boundaries, and returns bucketized data.
* `tft.bucketize` and `tft.apply_buckets` now set metadata for the output
tensor, which means the resulting tf.Metadata for the output of these
functions will contain min and max values based on the number of buckets,
and also be set to categorical.
* Testing helper function assertAnalyzeAndTransformResults can now also test
the content of vocabulary files and other assets.
* Reduces the number of beam stages needed for certain analyzers, which can be
a performance bottleneck when transforming many features.
* Performance improvements in `tft.uniques`.
* Fix a bug in `tft.bucketize` where the bucket boundary could be same as a
min/max value, and was getting dropped.
* Allows scaling individual components of a tensor independently with
`tft.scale_by_min_max`, `tft.scale_to_0_1`, and `tft.scale_to_z_score`.
* Fix a bug where `apply_saved_transform` could only be applied in the global
name scope.
* Depends on `apache-beam[gcp]>=2.2,<3`.
* Fixes a bug where TransformDataset would not return correct output if the
output DatasetMetadata contained deferred values (such as vocabularies).
* Added checks that the prepreprocessing function's outputs all have the same
size in the batch dimension.
* Added `tft.apply_buckets` which takes an input tensor and a list of bucket
boundaries, and returns bucketized data.
* `tft.bucketize` and `tft.apply_buckets` now set metadata for the output
tensor, which means the resulting tf.Metadata for the output of these
functions will contain min and max values based on the number of buckets,
and also be set to categorical.
* Testing helper function assertAnalyzeAndTransformResults can now also test
the content of vocabulary files and other assets.
* Reduces the number of beam stages needed for certain analyzers, which can be
a performance bottleneck when transforming many features.
* Performance improvements in `tft.uniques`.
* Fix a bug in `tft.bucketize` where the bucket boundary could be same as a
min/max value, and was getting dropped.
* Allows scaling individual components of a tensor independently with
`tft.scale_by_min_max`, `tft.scale_to_0_1`, and `tft.scale_to_z_score`.
* Fix a bug where `apply_saved_transform` could only be applied in the global
name scope.
* Add warning when `frequency_threshold` that are <= 1. This is a no-op and
generally reflects mistaking `frequency_threshold` for a relative frequency
where in fact it is an absolute frequency.

## Breaking changes
* The interfaces of CombinerSpec and combine_analyzer have changed to allow
for multiple inputs/outputs.
* Requires pre-installed TensorFlow >=1.5,<2.

## Deprecations

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

def _make_required_install_packages():
return [
'apache-beam[gcp]>=2.2,<3',
'apache-beam[gcp]>=2.3,<3',

# Protobuf libraries < 3.3 contain some map-related data corruption bugs
# (b/35874111).
Expand Down
10 changes: 8 additions & 2 deletions tensorflow_transform/analyzers.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,10 @@ def uniques(x, top_k=None, frequency_threshold=None,
top_k: Limit the generated vocabulary to the first `top_k` elements. If set
to None, the full vocabulary is generated.
frequency_threshold: Limit the generated vocabulary only to elements whose
frequency is >= to the supplied threshold. If set to None, the full
vocabulary is generated.
absolute frequency is >= to the supplied threshold. If set to None, the
full vocabulary is generated. Absolute frequency means the number of
occurences of the element in the dataset, as opposed to the proportion of
instances that contain that element.
vocab_filename: The file name for the vocabulary file. If none, the
"uniques" scope name in the context of this graph will be used as the file
name. If not None, should be unique within a given preprocessing function.
Expand Down Expand Up @@ -509,6 +511,10 @@ def uniques(x, top_k=None, frequency_threshold=None,
raise ValueError(
'frequency_threshold must be non-negative, but got: %r' %
frequency_threshold)
elif frequency_threshold <= 1:
tf.logging.warn(
'frequency_threshold %d <= 1 is a no-op, use None instead.',
frequency_threshold)

if isinstance(x, tf.SparseTensor):
x = x.values
Expand Down
10 changes: 1 addition & 9 deletions tensorflow_transform/beam/analyzer_impls.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import collections
import os
import random


import apache_beam as beam
Expand Down Expand Up @@ -157,14 +156,7 @@ def expand(self, pcoll):
# via AsIter. By breaking fusion, we allow sharded files' sizes to be
# automatically computed (when possible), so we end up reading from fewer
# and larger files.
@beam.ptransform_fn
def Reshard(pcoll): # pylint: disable=invalid-name
return (
pcoll
| 'PairWithRandom' >> beam.Map(lambda x: (random.getrandbits(32), x))
| 'GroupByRandom' >> beam.GroupByKey()
| 'ExtractValues' >> beam.FlatMap(lambda x: x[1]))
counts |= 'Reshard' >> Reshard() # pylint: disable=no-value-for-parameter
counts |= 'Reshard' >> beam.transforms.Reshuffle() # pylint: disable=no-value-for-parameter

# Using AsIter instead of AsList below in order to reduce max memory
# usage (due to AsList caching).
Expand Down
50 changes: 39 additions & 11 deletions tensorflow_transform/beam/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,13 @@ class _GraphState(object):
def __init__(self, saved_model_dir, input_schema, exclude_outputs,
tf_config):
self.saved_model_dir = saved_model_dir
self.session = tf.Session(graph=tf.Graph(), config=tf_config)
with self.session.graph.as_default():
with tf.Session(config=tf_config):
graph = tf.Graph()
self.session = tf.Session(graph=graph, config=tf_config)
with graph.as_default():
with self.session.as_default():
inputs, outputs = saved_transform_io.partially_apply_saved_transform(
saved_model_dir, {})
self.session.run(tf.global_variables_initializer())
self.session.run(tf.tables_initializer())

input_schema_keys = input_schema.column_schemas.keys()
Expand Down Expand Up @@ -342,9 +344,9 @@ def process(self, batch, saved_model_dir):
def _assert_tensorflow_version():
# Fail with a clear error in case we are not using a compatible TF version.
major, minor, _ = tf.__version__.split('.')
if int(major) != 1 or int(minor) < 4:
if int(major) != 1 or int(minor) < 5:
raise RuntimeError(
'Tensorflow version >= 1.4, < 2 is required. Found (%s). Please '
'TensorFlow version >= 1.5, < 2 is required. Found (%s). Please '
'install the latest 1.x version from '
'https://github.com/tensorflow/tensorflow. ' % tf.__version__)

Expand Down Expand Up @@ -372,6 +374,8 @@ def _write_saved_transform(graph, inputs, outputs, saved_model_dir):
removed_collections.append((collection_name,
graph.get_collection(collection_name)))
graph.clear_collection(collection_name)
# Initialize all variables so they can be saved.
session.run(tf.global_variables_initializer())
saved_transform_io.write_saved_transform_from_session(
session, inputs, outputs, saved_model_dir)
for collection_name, collection in removed_collections:
Expand Down Expand Up @@ -478,6 +482,7 @@ def replace_tensors_with_constant_values(saved_model_dir,
input_tensors, output_tensors = (
saved_transform_io.partially_apply_saved_transform(
saved_model_dir, {}, tensor_replacement_map))
session.run(tf.global_variables_initializer())
saved_transform_io.write_saved_transform_from_session(
session, input_tensors, output_tensors, temp_dir)
return temp_dir
Expand Down Expand Up @@ -602,6 +607,7 @@ def extract_scalar_constants(tensor_names, saved_model_dir,
tensor_output_map = (
saved_transform_io.fetch_tensor_values(
saved_model_dir, tensor_replacement_map, tensor_names))
session.run(tf.global_variables_initializer())
session.run(tf.tables_initializer())
return session.run(tensor_output_map)

Expand Down Expand Up @@ -650,21 +656,43 @@ def expand(self, dataset):
Returns:
A TransformFn containing the deferred transform function.
"""
Raises:
ValueError: If preprocessing_fn has no outputs.
"""
input_values, input_metadata = dataset
input_schema = input_metadata.schema

base_temp_dir = Context.create_base_temp_dir()

graph = tf.Graph()
with graph.as_default():

with tf.name_scope('inputs'):
inputs = input_schema.as_batched_placeholders()
# In order to avoid a bug where import_graph_def fails when the input_map
# and return_elements of an imported graph are the same (b/34288791), we
# avoid using the placeholder of an input column as an output of a graph.
# We do this by applying tf.identity to all inputs of the
# preprocessing_fn. Note this applies at the level of raw tensors.
outputs = self._preprocessing_fn(impl_helper.copy_tensors(inputs))

# At this point we check that the preprocessing_fn has at least one
# output. This is because if we allowed the output of preprocessing_fn to
# be empty, we wouldn't be able to determine how many instances to
# "unbatch" the output into.
if not outputs:
raise ValueError('The preprocessing function returned an empty dict')

if graph.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES):
raise ValueError(
'The preprocessing function contained trainable variables '
'{}'.format(
graph.get_collection_ref(tf.GraphKeys.TRAINABLE_VARIABLES)))

# NOTE: it's important that create_phases is called directly after
# run_preprocessing_fn, because we later mutate the graph's
# TABLE_INITIALIZERS collection which would break the logic in
# create_phases.
inputs, outputs = impl_helper.run_preprocessing_fn(
self._preprocessing_fn, input_schema)
# preprocessing_fn, because we later mutate the graph's TABLE_INITIALIZERS
# collection which would break the logic in create_phases.
phases = impl_helper.create_phases()

# Iterate through levels. tensor_pcoll_mapping is a mapping from tensor
Expand Down
90 changes: 89 additions & 1 deletion tensorflow_transform/beam/impl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import math
import os
import random
import shutil


import apache_beam as beam
Expand All @@ -42,8 +43,9 @@
from tensorflow_transform.tf_metadata import dataset_schema as sch
from tensorflow_transform.tf_metadata import metadata_io

from google.protobuf import text_format
import unittest
# pylint: enable=g-import-not-at-top
from tensorflow.core.example import example_pb2


class BeamImplTest(tft_unit.TransformTestCase):
Expand Down Expand Up @@ -367,6 +369,91 @@ def preprocessing_fn(inputs):
input_data, input_metadata, preprocessing_fn, expected_data,
expected_metadata)

def testRawFeedDictInput(self):
# Test the ability to feed raw data into AnalyzeDataset and TransformDataset
# by using subclasses of these transforms which create batches of size 1.
def preprocessing_fn(inputs):
sequence_example = inputs['sequence_example']

# Ordinarily this would have shape (batch_size,) since 'sequence_example'
# was defined as a FixedLenFeature with shape (). But since we specified
# desired_batch_size, we can assume that the shape is (1,), and reshape
# to ().
sequence_example = tf.reshape(sequence_example, ())

# Parse the sequence example.
feature_spec = {
'x': tf.FixedLenSequenceFeature(shape=[], dtype=tf.string,
default_value=None)
}
_, sequences = tf.parse_single_sequence_example(
sequence_example, sequence_features=feature_spec)

# Create a batch based on the sequence "x".
return {'x': sequences['x']}

def text_sequence_example_to_binary(text_proto):
proto = text_format.Merge(text_proto, example_pb2.SequenceExample())
return proto.SerializeToString()

sequence_examples = [
"""
feature_lists: {
feature_list: {
key: "x"
value: {
feature: {bytes_list: {value: 'ab'}}
feature: {bytes_list: {value: ''}}
feature: {bytes_list: {value: 'c'}}
feature: {bytes_list: {value: 'd'}}
}
}
}
""",
"""
feature_lists: {
feature_list: {
key: "x"
value: {
feature: {bytes_list: {value: 'ef'}}
feature: {bytes_list: {value: 'g'}}
}
}
}
"""
]
input_data = [
{'sequence_example': text_sequence_example_to_binary(sequence_example)}
for sequence_example in sequence_examples]
input_metadata = dataset_metadata.DatasetMetadata({
'sequence_example': sch.ColumnSchema(tf.string, [],
sch.FixedColumnRepresentation())
})
expected_data = [
{'x': 'ab'},
{'x': ''},
{'x': 'c'},
{'x': 'd'},
{'x': 'ef'},
{'x': 'g'}
]
expected_metadata = dataset_metadata.DatasetMetadata({
'x': sch.ColumnSchema(tf.string, [], sch.FixedColumnRepresentation())
})

with beam_impl.Context(temp_dir=self.get_temp_dir(), desired_batch_size=1):
transform_fn = ((input_data, input_metadata)
| beam_impl.AnalyzeDataset(preprocessing_fn))
transformed_data, transformed_metadata = (
((input_data, input_metadata), transform_fn)
| beam_impl.TransformDataset())

self.assertDataCloseOrEqual(expected_data, transformed_data)
transformed_metadata = self._resolveDeferredMetadata(transformed_metadata)
self.assertEqual(expected_metadata.schema.column_schemas,
transformed_metadata.schema.column_schemas)
self.assertEqual(expected_metadata, transformed_metadata)

def testAnalyzerBeforeMap(self):
def preprocessing_fn(inputs):
return {'x_scaled': tft.scale_to_0_1(inputs['x'])}
Expand Down Expand Up @@ -2595,5 +2682,6 @@ def testBucketizationEqualDistributionInterleaved(self):
inputs, expected_buckets, tf.int32, num_buckets=101)



if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 30def1a

Please sign in to comment.