Skip to content

Commit

Permalink
Merge branch
Browse files Browse the repository at this point in the history
  • Loading branch information
luozhouyang committed Nov 19, 2021
2 parents 6e21586 + 45afa13 commit df43ede
Show file tree
Hide file tree
Showing 14 changed files with 632 additions and 3 deletions.
21 changes: 20 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
.idea/
.vscode/
log/
logs/
testdata

models/
*.imp
*.pyc
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

*.ipynb
# C extensions
*.so

Expand All @@ -20,6 +31,7 @@ parts/
sdist/
var/
wheels/

pip-wheel-metadata/
share/python-wheels/
*.egg-info/
Expand Down Expand Up @@ -48,6 +60,7 @@ nosetests.xml
coverage.xml
*.cover
*.py,cover

.hypothesis/
.pytest_cache/

Expand Down Expand Up @@ -97,6 +110,11 @@ __pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py
Expand All @@ -122,6 +140,7 @@ venv.bak/

# mypy
.mypy_cache/

.dmypy.json
dmypy.json

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# smile_datasets
Smile Dataset: Use tf.data to solve the last mile data loading problem
# smile-datasets
La**S**t **mile** Dataset: Use tf.data to solve the last mile data loading problem
Empty file added datasets/__init__.py
Empty file.
75 changes: 75 additions & 0 deletions datasets/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import abc

import tensorflow as tf

try:
AUTOTUNE = tf.data.AUTOTUNE
except:
AUTOTUNE = tf.data.experimental.AUTOTUNE


class AbcDataset(abc.ABC):
"""Abstract dataset"""

def __call__(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
dataset = self._filter(dataset, **kwargs)
dataset = self._repeat(dataset, **kwargs)
dataset = self._shuffle(dataset, **kwargs)
dataset = self._padding(dataset, **kwargs)
dataset = self._to_dict(dataset, **kwargs)
dataset = self._auto_shard(dataset, **kwargs)
return dataset

@abc.abstractmethod
def _filter(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
raise NotImplementedError()

def _repeat(self, dataset: tf.data.Dataset, repeat=None, **kwargs) -> tf.data.Dataset:
if repeat is None:
return dataset
return dataset.repeat(repeat)

def _shuffle(
self,
dataset: tf.data.Dataset,
shuffle=True,
buffer_size=100000,
seed=None,
reshuffle_each_iteration=True,
**kwargs,
) -> tf.data.Dataset:
if not shuffle:
return dataset
dataset = dataset.shuffle(buffer_size=buffer_size, seed=seed, reshuffle_each_iteration=reshuffle_each_iteration)
return dataset

def _padding(self, dataset: tf.data.Dataset, padding_strategy="bucket", **kwargs) -> tf.data.Dataset:
if padding_strategy == "fixed":
return self._fixed_padding(dataset, **kwargs)
if padding_strategy == "batch":
return self._batch_padding(dataset, **kwargs)
return self._bucket_padding(dataset, **kwargs)

@abc.abstractmethod
def _fixed_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
raise NotImplementedError("Fixed padding not supported yet!")

@abc.abstractmethod
def _batch_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
raise NotImplementedError("Batch padding is not supported yet!")

@abc.abstractmethod
def _bucket_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
raise NotImplementedError("Bucket padding is not supported yet!")

@abc.abstractmethod
def _to_dict(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
raise NotImplementedError()

def _auto_shard(self, dataset: tf.data.Dataset, auto_shard_policy=None, **kwargs) -> tf.data.Dataset:
if auto_shard_policy is not None:
options = tf.data.Options()
# options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
options.experimental_distribute.auto_shard_policy = auto_shard_policy
dataset = dataset.with_options(options)
return dataset
Empty file added datasets/qa/__init__.py
Empty file.
194 changes: 194 additions & 0 deletions datasets/qa/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import abc
import logging
from typing import Dict, List

import tensorflow as tf
from datasets import utils
from datasets.dataset import AbcDataset
from tokenizers import BertWordPieceTokenizer

from .example import ExampleForQuestionAnswering
from .parsers import ParserForQuestionAnswering
from .readers import read_dureader_checklist, read_dureader_rubost, read_jsonl_files


class DatasetForQuestionAnswering(abc.ABC):
""" """

@abc.abstractmethod
def __len__(self):
raise NotImplementedError()

@abc.abstractmethod
def __getitem__(self, index) -> ExampleForQuestionAnswering:
raise NotImplementedError()

def __iter__(self) -> ExampleForQuestionAnswering:
for idx in range(len(self)):
yield self[idx]

def save_tfrecord(self, output_files, **kwargs):
"""Save examples to tfrecord"""

def _encoding(example: ExampleForQuestionAnswering):
feature = {
"input_ids": utils.int64_feature([int(x) for x in example.input_ids]),
"segment_ids": utils.int64_feature([int(x) for x in example.segment_ids]),
"attention_mask": utils.int64_feature([int(x) for x in example.attention_mask]),
"start": utils.int64_feature([int(example.start)]),
"end": utils.int64_feature([int(example.end)]),
}
return feature

utils.save_tfrecord(iter(self), _encoding, output_files, **kwargs)


class DatapipeForQuestionAnswering(AbcDataset):
""" """

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)

@classmethod
def from_tfrecord_files(cls, input_files, num_parallel_calls=None, buffer_size=None, **kwargs) -> tf.data.Dataset:
dataset = utils.read_tfrecord_files(input_files, **kwargs)
num_parallel_calls = num_parallel_calls or utils.AUTOTUNE
buffer_size = buffer_size or utils.AUTOTUNE
# parse examples
features = {
"input_ids": tf.io.VarLenFeature(tf.int64),
"segment_ids": tf.io.VarLenFeature(tf.int64),
"attention_mask": tf.io.VarLenFeature(tf.int64),
"start": tf.io.VarLenFeature(tf.int64),
"end": tf.io.VarLenFeature(tf.int64),
}
dataset = dataset.map(
lambda x: tf.io.parse_example(x, features),
num_parallel_calls=num_parallel_calls,
).prefetch(buffer_size)
dataset = dataset.map(
lambda x: (
tf.cast(tf.sparse.to_dense(x["input_ids"]), tf.int32),
tf.cast(tf.sparse.to_dense(x["segment_ids"]), tf.int32),
tf.cast(tf.sparse.to_dense(x["attention_mask"]), tf.int32),
tf.cast(tf.squeeze(tf.sparse.to_dense(x["start"])), tf.int32),
tf.cast(tf.squeeze(tf.sparse.to_dense(x["end"])), tf.int32),
),
num_parallel_calls=num_parallel_calls,
).prefetch(buffer_size)
# do transformation
d = cls(**kwargs)
return d(dataset, **kwargs)

@classmethod
def from_dureader_robust(cls, input_files, tokenizer=None, vocab_file=None, **kwargs) -> tf.data.Dataset:
instances = read_dureader_rubost(input_files, **kwargs)
return cls.from_instances(instances, tokenizer=tokenizer, vocab_file=vocab_file, **kwargs)

@classmethod
def from_dureader_checklist(cls, input_files, tokenizer=None, vocab_file=None, **kwargs) -> tf.data.Dataset:
instances = read_dureader_checklist(input_files, **kwargs)
return cls.from_instances(instances, tokenizer=tokenizer, vocab_file=vocab_file, **kwargs)

@classmethod
def from_jsonl_files(cls, input_files, tokenizer=None, vocab_file=None, **kwargs) -> tf.data.Dataset:
instances = read_jsonl_files(input_files, **kwargs)
return cls.from_instances(instances, tokenizer=tokenizer, vocab_file=vocab_file, **kwargs)

@classmethod
def from_instances(
cls, instances: List[Dict], tokenizer: BertWordPieceTokenizer = None, vocab_file=None, **kwargs
) -> tf.data.Dataset:
"""Build tf.data.Dataset from json instances.
Args:
instances: List instance of dict, each instance contains keys `context`, `question`, `answer` and `id`
tokenizer: Tokenizer used to tokenize text
vocab_file: The vocab path to build tokenizer. The `tokenizer` or `vocab_file` must be provided!
Returns:
Instance of tf.data.Dataset, can be used to fit to tf.keras.Model directly.
"""
examples = []
parser = ParserForQuestionAnswering(tokenizer=tokenizer, vocab_file=vocab_file, **kwargs)
for instance in instances:
e = parser.parse(instance, **kwargs)
if not e:
continue
examples.append(e)
logging.info("Read %d examples in total.", len(examples))
return cls.from_examples(examples, **kwargs)

@classmethod
def from_dataset(cls, dataset: DatasetForQuestionAnswering, **kwargs) -> tf.data.Dataset:
examples = []
for idx in range(len(dataset)):
examples.append(dataset[idx])
return cls.from_examples(examples, **kwargs)

@classmethod
def from_examples(cls, examples: List[ExampleForQuestionAnswering], **kwargs) -> tf.data.Dataset:
d = cls(**kwargs)

def _to_dataset(x, dtype=tf.int32):
x = tf.ragged.constant(x, dtype=dtype)
d = tf.data.Dataset.from_tensor_slices(x)
d = d.map(lambda x: x)
return d

dataset = tf.data.Dataset.zip(
(
_to_dataset(x=[e.input_ids for e in examples], dtype=tf.int32),
_to_dataset(x=[e.segment_ids for e in examples], dtype=tf.int32),
_to_dataset(x=[e.attention_mask for e in examples], dtype=tf.int32),
_to_dataset(x=[e.start for e in examples], dtype=tf.int32),
_to_dataset(x=[e.end for e in examples], dtype=tf.int32),
)
)

return d(dataset, **kwargs)

def _filter(self, dataset: tf.data.Dataset, max_sequence_length=512, **kwargs) -> tf.data.Dataset:
dataset = dataset.filter(lambda a, b, c, x, y: tf.size(a) <= max_sequence_length)
return dataset

def _to_dict(self, dataset: tf.data.Dataset, start_key="start", end_key="end", **kwargs) -> tf.data.Dataset:
dataset = dataset.map(
lambda a, b, c, x, y: ({"input_ids": a, "segment_ids": b, "attention_mask": c}, {start_key: x, end_key: y}),
num_parallel_calls=kwargs.get("num_parallel_calls", utils.AUTOTUNE),
).prefetch(kwargs.get("buffer_size", utils.AUTOTUNE))
return dataset

def _batch_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
pad_id = tf.constant(kwargs.get("pad_id", 0), dtype=tf.int32)
# fmt: off
padded_shapes = kwargs.get("padded_shapes", ([None, ], [None, ], [None, ], [], []))
padding_values = kwargs.get("padding_values", (pad_id, pad_id, pad_id, None, None))
# fmt: on
dataset = utils.batching_and_padding(dataset, padded_shapes, padding_values, **kwargs)
return dataset

def _fixed_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
maxlen = tf.constant(kwargs.get("max_sequence_length", 0), dtype=tf.int32)
pad_id = tf.constant(kwargs.get("pad_id", 0), dtype=tf.int32)
# fmt: off
padded_shapes = kwargs.get("padded_shapes", ([maxlen, ], [maxlen, ], [maxlen, ], [], []))
padding_values = kwargs.get("padding_values", (pad_id, pad_id, pad_id, None, None))
# fmt: on
dataset = utils.batching_and_padding(dataset, padded_shapes, padding_values, **kwargs)
return dataset

def _bucket_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset:
pad_id = tf.constant(kwargs.get("pad_id", 0), dtype=tf.int32)
# fmt: off
padded_shapes = ([None, ], [None, ], [None, ], [], [])
padding_values = (pad_id, pad_id, pad_id, None, None)
# fmt: on
dataset = utils.bucketing_and_padding(
dataset,
bucket_fn=lambda a, b, c, x, y: tf.size(a),
padded_shapes=padded_shapes,
padding_values=padding_values,
**kwargs,
)
return dataset
7 changes: 7 additions & 0 deletions datasets/qa/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import os
from collections import namedtuple

ExampleForQuestionAnswering = namedtuple(
"ExampleForQuestionAnswering",
["tokens", "input_ids", "segment_ids", "attention_mask", "start", "end"],
)
Loading

0 comments on commit df43ede

Please sign in to comment.