From 75a3bdc157976a5da5552456634cab50e0676edd Mon Sep 17 00:00:00 2001 From: luozhouyang Date: Wed, 17 Nov 2021 11:40:59 +0800 Subject: [PATCH 1/3] Init project --- .gitignore | 115 +++++++++++++++++++++++++++++++++++++++++++ README.md | 0 datasets/__init__.py | 0 requirements.txt | 1 + tests/__init__.py | 0 5 files changed, 116 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 datasets/__init__.py create mode 100644 requirements.txt create mode 100644 tests/__init__.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9eeb368 --- /dev/null +++ b/.gitignore @@ -0,0 +1,115 @@ +.idea/ +.vscode/ +log/ +logs/ +testdata + +models/ +*.imp +*.pyc +.DS_Store +__pycache__ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +*.ipynb +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST +ifchange +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/datasets/__init__.py b/datasets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..aab16ec --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +tensorflow-cpu diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 From 3cfc3d42da9c9de6dabaaa0be4c0eeb76b5a187a Mon Sep 17 00:00:00 2001 From: luozhouyang Date: Fri, 19 Nov 2021 15:53:41 +0800 Subject: [PATCH 2/3] Add datasets for question answering --- datasets/dataset.py | 75 ++++++++++++++ datasets/qa/__init__.py | 0 datasets/qa/dataset.py | 184 +++++++++++++++++++++++++++++++++ datasets/qa/example.py | 7 ++ datasets/qa/parsers.py | 78 ++++++++++++++ datasets/qa/readers.py | 44 ++++++++ datasets/utils.py | 122 ++++++++++++++++++++++ requirements.txt | 1 + tests/qa_tests/__init__.py | 0 tests/qa_tests/dataset_test.py | 72 +++++++++++++ 10 files changed, 583 insertions(+) create mode 100644 datasets/dataset.py create mode 100644 datasets/qa/__init__.py create mode 100644 datasets/qa/dataset.py create mode 100644 datasets/qa/example.py create mode 100644 datasets/qa/parsers.py create mode 100644 datasets/qa/readers.py create mode 100644 datasets/utils.py create mode 100644 tests/qa_tests/__init__.py create mode 100644 tests/qa_tests/dataset_test.py diff --git a/datasets/dataset.py b/datasets/dataset.py new file mode 100644 index 0000000..b574db3 --- /dev/null +++ b/datasets/dataset.py @@ -0,0 +1,75 @@ +import abc + +import tensorflow as tf + +try: + AUTOTUNE = tf.data.AUTOTUNE +except: + AUTOTUNE = tf.data.experimental.AUTOTUNE + + +class AbcDataset(abc.ABC): + """Abstract dataset""" + + def __call__(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + dataset = self._filter(dataset, **kwargs) + dataset = self._repeat(dataset, **kwargs) + dataset = self._shuffle(dataset, **kwargs) + dataset = self._padding(dataset, **kwargs) + dataset = self._to_dict(dataset, **kwargs) + dataset = self._auto_shard(dataset, **kwargs) + return dataset + + @abc.abstractmethod + def _filter(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + raise NotImplementedError() + + def _repeat(self, dataset: tf.data.Dataset, repeat=None, **kwargs) -> tf.data.Dataset: + if repeat is None: + return dataset + return dataset.repeat(repeat) + + def _shuffle( + self, + dataset: tf.data.Dataset, + shuffle=True, + buffer_size=100000, + seed=None, + reshuffle_each_iteration=True, + **kwargs, + ) -> tf.data.Dataset: + if not shuffle: + return dataset + dataset = dataset.shuffle(buffer_size=buffer_size, seed=seed, reshuffle_each_iteration=reshuffle_each_iteration) + return dataset + + def _padding(self, dataset: tf.data.Dataset, padding_strategy="bucket", **kwargs) -> tf.data.Dataset: + if padding_strategy == "fixed": + return self._fixed_padding(dataset, **kwargs) + if padding_strategy == "batch": + return self._batch_padding(dataset, **kwargs) + return self._bucket_padding(dataset, **kwargs) + + @abc.abstractmethod + def _fixed_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + raise NotImplementedError("Fixed padding not supported yet!") + + @abc.abstractmethod + def _batch_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + raise NotImplementedError("Batch padding is not supported yet!") + + @abc.abstractmethod + def _bucket_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + raise NotImplementedError("Bucket padding is not supported yet!") + + @abc.abstractmethod + def _to_dict(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + raise NotImplementedError() + + def _auto_shard(self, dataset: tf.data.Dataset, auto_shard_policy=None, **kwargs) -> tf.data.Dataset: + if auto_shard_policy is not None: + options = tf.data.Options() + # options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA + options.experimental_distribute.auto_shard_policy = auto_shard_policy + dataset = dataset.with_options(options) + return dataset diff --git a/datasets/qa/__init__.py b/datasets/qa/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/datasets/qa/dataset.py b/datasets/qa/dataset.py new file mode 100644 index 0000000..fa09896 --- /dev/null +++ b/datasets/qa/dataset.py @@ -0,0 +1,184 @@ +import abc +import logging +import os +from typing import Dict, List + +import tensorflow as tf +from datasets import utils +from datasets.dataset import AbcDataset +from tokenizers import BertWordPieceTokenizer + +from .example import ExampleForQuestionAnswering +from .parsers import ParserForQuestionAnswering +from .readers import read_dureader_checklist, read_dureader_rubost + + +class DatasetForQuestionAnswering(abc.ABC): + """ """ + + @abc.abstractmethod + def __len__(self): + raise NotImplementedError() + + @abc.abstractmethod + def __getitem__(self, index) -> ExampleForQuestionAnswering: + raise NotImplementedError() + + def __iter__(self) -> ExampleForQuestionAnswering: + for idx in range(len(self)): + yield self[idx] + + def save_tfrecord(self, output_files, **kwargs): + """Save examples to tfrecord""" + + def _encoding(example: ExampleForQuestionAnswering): + feature = { + "input_ids": utils.int64_feature([int(x) for x in example.input_ids]), + "segment_ids": utils.int64_feature([int(x) for x in example.segment_ids]), + "attention_mask": utils.int64_feature([int(x) for x in example.attention_mask]), + "start": utils.int64_feature([int(example.start)]), + "end": utils.int64_feature([int(example.end)]), + } + return feature + + utils.save_tfrecord(iter(self), _encoding, output_files, **kwargs) + + +class DatapipeForQuestionAnswering(AbcDataset): + """ """ + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + @classmethod + def from_tfrecord_files(cls, input_files, num_parallel_calls=None, buffer_size=None, **kwargs) -> tf.data.Dataset: + dataset = utils.read_tfrecord_files(input_files, **kwargs) + num_parallel_calls = num_parallel_calls or utils.AUTOTUNE + buffer_size = buffer_size or utils.AUTOTUNE + # parse examples + features = { + "input_ids": tf.io.VarLenFeature(tf.int64), + "segment_ids": tf.io.VarLenFeature(tf.int64), + "attention_mask": tf.io.VarLenFeature(tf.int64), + "start": tf.io.VarLenFeature(tf.int64), + "end": tf.io.VarLenFeature(tf.int64), + } + dataset = dataset.map( + lambda x: tf.io.parse_example(x, features), + num_parallel_calls=num_parallel_calls, + ).prefetch(buffer_size) + dataset = dataset.map( + lambda x: ( + tf.cast(tf.sparse.to_dense(x["input_ids"]), tf.int32), + tf.cast(tf.sparse.to_dense(x["segment_ids"]), tf.int32), + tf.cast(tf.sparse.to_dense(x["attention_mask"]), tf.int32), + tf.cast(tf.squeeze(tf.sparse.to_dense(x["start"])), tf.int32), + tf.cast(tf.squeeze(tf.sparse.to_dense(x["end"])), tf.int32), + ), + num_parallel_calls=num_parallel_calls, + ).prefetch(buffer_size) + # do transformation + d = cls(**kwargs) + return d(dataset, **kwargs) + + @classmethod + def from_dureader_robust(cls, input_files, **kwargs) -> tf.data.Dataset: + instances = read_dureader_rubost(input_files, **kwargs) + return cls.from_instances(instances, **kwargs) + + @classmethod + def from_dureader_checklist(cls, input_files, **kwargs) -> tf.data.Dataset: + instances = read_dureader_checklist(input_files, **kwargs) + return cls.from_instances(instances, **kwargs) + + @classmethod + def from_instances( + cls, instances: List[Dict], tokenizer: BertWordPieceTokenizer = None, vocab_file=None, **kwargs + ) -> tf.data.Dataset: + examples = [] + parser = ParserForQuestionAnswering(tokenizer=tokenizer, vocab_file=vocab_file, **kwargs) + for instance in instances: + e = parser.parse(instance, **kwargs) + if not e: + continue + examples.append(e) + logging.info("Read %d examples in total.", len(examples)) + return cls.from_examples(examples, **kwargs) + + @classmethod + def from_dataset(cls, dataset: DatasetForQuestionAnswering, **kwargs) -> tf.data.Dataset: + examples = [] + for idx in range(len(dataset)): + examples.append(dataset[idx]) + return cls.from_examples(examples, **kwargs) + + @classmethod + def from_examples(cls, examples: List[ExampleForQuestionAnswering], **kwargs) -> tf.data.Dataset: + d = cls(**kwargs) + dataset = d._zip_dataset(examples, **kwargs) + return d(dataset, **kwargs) + + def _filter(self, dataset: tf.data.Dataset, max_sequence_length=512, **kwargs) -> tf.data.Dataset: + dataset = dataset.filter(lambda a, b, c, x, y: tf.size(a) <= max_sequence_length) + return dataset + + def _to_dict(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + dataset = dataset.map( + lambda a, b, c, x, y: ({"input_ids": a, "segment_ids": b, "attention_mask": c}, {"head": x, "tail": y}), + num_parallel_calls=kwargs.get("num_parallel_calls", utils.AUTOTUNE), + ).prefetch(kwargs.get("buffer_size", utils.AUTOTUNE)) + return dataset + + def _batch_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + pad_id = tf.constant(kwargs.get("pad_id", 0), dtype=tf.int32) + # fmt: off + padded_shapes = kwargs.get("padded_shapes", ([None, ], [None, ], [None, ], [], [])) + padding_values = kwargs.get("padding_values", (pad_id, pad_id, pad_id, None, None)) + # fmt: on + dataset = utils.batching_and_padding(dataset, padded_shapes, padding_values, **kwargs) + return dataset + + def _fixed_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + maxlen = tf.constant(kwargs.get("max_sequence_length", 0), dtype=tf.int32) + pad_id = tf.constant(kwargs.get("pad_id", 0), dtype=tf.int32) + # fmt: off + padded_shapes = kwargs.get("padded_shapes", ([maxlen, ], [maxlen, ], [maxlen, ], [], [])) + padding_values = kwargs.get("padding_values", (pad_id, pad_id, pad_id, None, None)) + # fmt: on + dataset = utils.batching_and_padding(dataset, padded_shapes, padding_values, **kwargs) + return dataset + + def _bucket_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + pad_id = tf.constant(kwargs.get("pad_id", 0), dtype=tf.int32) + # fmt: off + padded_shapes = ([None, ], [None, ], [None, ], [], []) + padding_values = (pad_id, pad_id, pad_id, None, None) + # fmt: on + dataset = utils.bucketing_and_padding( + dataset, + bucket_fn=lambda a, b, c, x, y: tf.size(a), + padded_shapes=padded_shapes, + padding_values=padding_values, + **kwargs, + ) + return dataset + + def _zip_dataset(self, examples: List[ExampleForQuestionAnswering], **kwargs) -> tf.data.Dataset: + """Zip examples to tf.data.Dataset""" + + def _to_dataset(x, dtype=tf.int32): + x = tf.ragged.constant(x, dtype=dtype) + d = tf.data.Dataset.from_tensor_slices(x) + d = d.map(lambda x: x) + return d + + dataset = tf.data.Dataset.zip( + ( + _to_dataset(x=[e.input_ids for e in examples], dtype=tf.int32), + _to_dataset(x=[e.segment_ids for e in examples], dtype=tf.int32), + _to_dataset(x=[e.attention_mask for e in examples], dtype=tf.int32), + _to_dataset(x=[e.start for e in examples], dtype=tf.int32), + _to_dataset(x=[e.end for e in examples], dtype=tf.int32), + ) + ) + return dataset diff --git a/datasets/qa/example.py b/datasets/qa/example.py new file mode 100644 index 0000000..f05df19 --- /dev/null +++ b/datasets/qa/example.py @@ -0,0 +1,7 @@ +import os +from collections import namedtuple + +ExampleForQuestionAnswering = namedtuple( + "ExampleForQuestionAnswering", + ["tokens", "input_ids", "segment_ids", "attention_mask", "start", "end"], +) diff --git a/datasets/qa/parsers.py b/datasets/qa/parsers.py new file mode 100644 index 0000000..542c5d7 --- /dev/null +++ b/datasets/qa/parsers.py @@ -0,0 +1,78 @@ +import abc +import json +import logging +import os +import re +from typing import Dict, List + +from tokenizers import BertWordPieceTokenizer + +from .example import ExampleForQuestionAnswering + + +class AbstractExampleParser(abc.ABC): + """Abc example parser""" + + @abc.abstractmethod + def parse(self, instance: Dict, **kwargs): + raise NotImplementedError() + + +class ParserForQuestionAnswering(AbstractExampleParser): + """Parse example for qa task""" + + def __init__(self, tokenizer=None, vocab_file=None, do_lower_case=True, **kwargs) -> None: + assert tokenizer or vocab_file, "`tokenizer` or `vocab_file` must be provided!" + self.tokenizer = tokenizer or BertWordPieceTokenizer.from_file(vocab_file, lowercase=do_lower_case) + + def from_vocab(cls, vocab_file, **kwargs): + tokenizer = BertWordPieceTokenizer.from_file(vocab_file, lowercase=kwargs.get("do_lower_case", True)) + return cls.from_tokenizer(tokenizer, **kwargs) + + def from_tokenizer(cls, tokenizer: BertWordPieceTokenizer, **kwargs): + return cls(tokenizer=tokenizer, vocab_file=None, **kwargs) + + def _find_answer_span(self, context, answer): + for m in re.finditer(re.escape(answer), context, re.IGNORECASE): + start, end = m.span() + return start, end + return 0, 0 + + def parse(self, instance: Dict, **kwargs) -> ExampleForQuestionAnswering: + context, question, answer = instance["context"], instance["question"], instance["answer"] + start_char_idx, end_char_idx = self._find_answer_span(context, answer) + if end_char_idx <= start_char_idx: + return None + # Mark the character indexes in context that are in answer + is_char_in_ans = [0] * len(context) + for idx in range(start_char_idx, end_char_idx): + is_char_in_ans[idx] = 1 + context_encoding = self.tokenizer.encode(context, add_special_tokens=True) + # Find tokens that were created from answer characters + ans_token_idx = [] + for idx, (start_char_idx, end_char_idx) in enumerate(context_encoding.offsets): + if sum(is_char_in_ans[start_char_idx:end_char_idx]) > 0: + ans_token_idx.append(idx) + if not ans_token_idx: + return None + start_token_idx, end_token_idx = ans_token_idx[0], ans_token_idx[-1] + question_encoding = self.tokenizer.encode(question, add_special_tokens=True) + input_ids = context_encoding.ids + question_encoding.ids[1:] + segment_ids = [0] * len(context_encoding.type_ids) + [1] * len(question_encoding.type_ids[1:]) + attention_mask = [1] * len(context_encoding.attention_mask + question_encoding.attention_mask[1:]) + assert len(input_ids) == len(segment_ids), "input_ids length:{} VS segment_ids length: {}".format( + len(input_ids), len(segment_ids) + ) + assert len(input_ids) == len(attention_mask), "input_ids length:{} VS attention_mask length: {}".format( + len(input_ids), len(attention_mask) + ) + + example = ExampleForQuestionAnswering( + tokens=context_encoding.tokens + question_encoding.tokens[1:], + input_ids=input_ids, + segment_ids=segment_ids, + attention_mask=attention_mask, + start=start_token_idx, + end=end_token_idx, + ) + return example diff --git a/datasets/qa/readers.py b/datasets/qa/readers.py new file mode 100644 index 0000000..7dfabb8 --- /dev/null +++ b/datasets/qa/readers.py @@ -0,0 +1,44 @@ +import abc +import json +import logging +import os +from typing import Dict, List + + +def read_dureader_rubost(input_files, **kwargs): + if isinstance(input_files, str): + input_files = [input_files] + for input_file in input_files: + with open(input_file, mode="rt", encoding="utf-8") as fin: + info = json.load(fin) + for d in info["data"]: + paragraphs = d["paragraphs"] + for p in paragraphs: + context = p["context"] + for qa in p["qas"]: + question = qa["question"] + _id = qa["id"] + if not qa["answers"]: + continue + answer = qa["answers"][0]["text"] + instance = {"context": context, "question": question, "answer": answer, "id": _id} + yield instance + + +def read_dureader_checklist(input_files, **kwargs): + if isinstance(input_files, str): + input_files = [input_files] + for input_file in input_files: + with open(input_file, mode="rt", encoding="utf-8") as fin: + info = json.load(fin) + for data in info["data"]: + for p in data["paragraphs"]: + title = p["title"] + context = p["context"] + for qa in p["qas"]: + if qa["is_impossible"]: + continue + question = qa["question"] + answer = qa["answers"][0]["text"] + instance = {"context": title + context, "question": question, "answer": answer, "id": qa["id"]} + yield instance diff --git a/datasets/utils.py b/datasets/utils.py new file mode 100644 index 0000000..84e6aed --- /dev/null +++ b/datasets/utils.py @@ -0,0 +1,122 @@ +import json +import logging +import os + +import tensorflow as tf + +try: + AUTOTUNE = tf.data.AUTOTUNE +except: + AUTOTUNE = tf.data.experimental.AUTOTUNE + + +def read_tfrecord_files(input_files, num_parallel_calls=None, **kwargs): + num_parallel_calls = num_parallel_calls or AUTOTUNE + if isinstance(input_files, str): + input_files = [input_files] + if len(input_files) == 1: + dataset = tf.data.TFRecordDataset(input_files) + else: + dataset = tf.data.Dataset.from_tensor_slices(input_files) + dataset = dataset.interleave( + lambda x: tf.data.TFRecordDataset(x), + cycle_length=len(input_files), + num_parallel_calls=num_parallel_calls, + ) + return dataset + + +def batching_and_padding(dataset: tf.data.Dataset, padded_shapes, padding_values, **kwargs) -> tf.data.Dataset: + dataset = dataset.padded_batch( + batch_size=kwargs.get("batch_size", 32), + padded_shapes=padded_shapes, + padding_values=padding_values, + drop_remainder=kwargs.get("drop_remainder", False), + ) + return dataset + + +def bucketing_and_padding(dataset: tf.data.Dataset, bucket_fn, padded_shapes, padding_values, **kwargs) -> tf.data.Dataset: + batch_size = kwargs.get("batch_size", 32) + bucket_boundaries = kwargs.get("bucket_boundaries", None) + if not bucket_boundaries: + bucket_boundaries = [] + maxlen = kwargs.get("max_sequence_length", 512) + numbuk = kwargs.get("num_buckets", 8) + step = maxlen // numbuk + for i in range(1, numbuk + 1): + v = i * step + if v >= maxlen: + break + bucket_boundaries.append(v) + + bucket_batch_sizes = kwargs.get("bucket_batch_sizes", None) + if not bucket_batch_sizes: + bucket_batch_sizes = [batch_size] * (1 + len(bucket_boundaries)) + + assert len(bucket_batch_sizes) == len(bucket_boundaries) + 1, "len(bucket_batch_sizes) != len(bucket_boundaries) + 1" + + try: + fn = tf.data.bucket_by_sequence_length + except: + fn = tf.data.experimental.bucket_by_sequence_length + + dataset = dataset.apply( + fn( + element_length_func=bucket_fn, + padded_shapes=padded_shapes, + padding_values=padding_values, + bucket_boundaries=bucket_boundaries, + bucket_batch_sizes=bucket_batch_sizes, + drop_remainder=kwargs.get("drop_remainder", False), + pad_to_bucket_boundary=kwargs.get("pad_to_bucket_boundary", False), + no_padding=kwargs.get("no_padding", False), + ) + ) + return dataset + + +def read_jsonl_files(input_files, select_keys=None, ignore_keys=None, **kwargs): + if isinstance(input_files, str): + input_files = [input_files] + for f in input_files: + with open(f, mode="rt", encoding="utf-8") as fin: + for line in fin: + d = json.loads(line) + if select_keys: + for k in list(d.keys()): + if k not in select_keys: + d.pop(k, None) + if ignore_keys: + for k in list(d.keys()): + if k in ignore_keys: + d.pop(k, None) + yield d + + +def int64_feature(values): + return tf.train.Feature(int64_list=tf.train.Int64List(value=values)) + + +def save_tfrecord(examples, fn, output_files, **kwargs): + if not examples: + logging.warning("Examples is empty or None, skipped.") + return + if isinstance(output_files, str): + output_files = [output_files] + writers = [tf.io.TFRecordWriter(f) for f in output_files] + idx, c = 0, 0 + for example in examples: + try: + feature = fn(example, **kwargs) + record = tf.train.Example(features=tf.train.Features(feature=feature)) + writers[idx].write(record.SerializeToString()) + except Exception as e: + logging.warning("Encode feature exception: ", e) + continue + c += 1 + idx += 1 + idx = idx % len(writers) + for w in writers: + w.close() + logging.info("Finished to write %d examples to tfrecords.", c) diff --git a/requirements.txt b/requirements.txt index aab16ec..cb28e36 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ tensorflow-cpu +tokenizers diff --git a/tests/qa_tests/__init__.py b/tests/qa_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/qa_tests/dataset_test.py b/tests/qa_tests/dataset_test.py new file mode 100644 index 0000000..7dcfa6c --- /dev/null +++ b/tests/qa_tests/dataset_test.py @@ -0,0 +1,72 @@ +import os +import unittest + +from datasets.qa import readers +from datasets.qa.dataset import DatapipeForQuestionAnswering, DatasetForQuestionAnswering +from datasets.qa.example import ExampleForQuestionAnswering +from datasets.qa.parsers import ParserForQuestionAnswering + +DUREADER_RUBOST_INPUT_FILE = os.path.join(os.environ["DUREADER_ROBUST_PATH"], "dev.json") +DUREADER_CHECKLIST_INPUT_FILE = os.path.join(os.environ["DUREADER_CHECKLIST_PATH"], "dev.json") +VOCAB_FILE = os.environ["BERT_VOCAB_PATH"] + + +class DuReaderDatasetForQuestionAnswering(DatasetForQuestionAnswering): + """ """ + + def __init__(self, input_files, vocab_file, subset="rubost", **kwargs) -> None: + super().__init__() + if subset == "rubost": + self.instances = list(readers.read_dureader_rubost(input_files, **kwargs)) + else: + self.instances = list(readers.read_dureader_checklist(input_files, **kwargs)) + self.parser = ParserForQuestionAnswering(tokenizer=None, vocab_file=vocab_file, **kwargs) + + def __len__(self): + return len(self.instances) + + def __getitem__(self, index) -> ExampleForQuestionAnswering: + instance = self.instances[index] + return self.parser.parse(instance) + + +class DatasetTest(unittest.TestCase): + """ """ + + def test_dataset_save_dureader_rubost_tfrecord(self): + d = DuReaderDatasetForQuestionAnswering(DUREADER_RUBOST_INPUT_FILE, VOCAB_FILE) + for idx, e in enumerate(d): + print(e) + if idx == 5: + break + d.save_tfrecord("testdata/dureader_rubost_dev.tfrecord") + + dataset = DatapipeForQuestionAnswering.from_tfrecord_files("testdata/dureader_rubost_dev.tfrecord") + print() + print(next(iter(dataset))) + + def test_dataset_save_dureader_checklist_tfrecord(self): + d = DuReaderDatasetForQuestionAnswering(DUREADER_CHECKLIST_INPUT_FILE, VOCAB_FILE, subset="checklist") + for idx, e in enumerate(d): + print(e) + if idx == 5: + break + d.save_tfrecord("testdata/dureader_checklist_dev.tfrecord") + + dataset = DatapipeForQuestionAnswering.from_tfrecord_files("testdata/dureader_checklist_dev.tfrecord") + print() + print(next(iter(dataset))) + + def test_datapipe_from_dureader_rubost(self): + dataset = DatapipeForQuestionAnswering.from_dureader_robust(DUREADER_RUBOST_INPUT_FILE, vocab_file=VOCAB_FILE) + print() + print(next(iter(dataset))) + + def test_datapipe_from_dureader_checklist(self): + dataset = DatapipeForQuestionAnswering.from_dureader_checklist(DUREADER_CHECKLIST_INPUT_FILE, vocab_file=VOCAB_FILE) + print() + print(next(iter(dataset))) + + +if __name__ == "__main__": + unittest.main() From 45afa132d465afd0a419c05809f17f6435c02f80 Mon Sep 17 00:00:00 2001 From: luozhouyang Date: Fri, 19 Nov 2021 16:26:09 +0800 Subject: [PATCH 3/3] Add support for loadding dataset from jsonl files --- datasets/qa/dataset.py | 68 ++++++++++++++++++++++++------------------ datasets/qa/readers.py | 24 ++++++++++++--- 2 files changed, 59 insertions(+), 33 deletions(-) diff --git a/datasets/qa/dataset.py b/datasets/qa/dataset.py index fa09896..c838197 100644 --- a/datasets/qa/dataset.py +++ b/datasets/qa/dataset.py @@ -1,6 +1,5 @@ import abc import logging -import os from typing import Dict, List import tensorflow as tf @@ -10,7 +9,7 @@ from .example import ExampleForQuestionAnswering from .parsers import ParserForQuestionAnswering -from .readers import read_dureader_checklist, read_dureader_rubost +from .readers import read_dureader_checklist, read_dureader_rubost, read_jsonl_files class DatasetForQuestionAnswering(abc.ABC): @@ -82,19 +81,34 @@ def from_tfrecord_files(cls, input_files, num_parallel_calls=None, buffer_size=N return d(dataset, **kwargs) @classmethod - def from_dureader_robust(cls, input_files, **kwargs) -> tf.data.Dataset: + def from_dureader_robust(cls, input_files, tokenizer=None, vocab_file=None, **kwargs) -> tf.data.Dataset: instances = read_dureader_rubost(input_files, **kwargs) - return cls.from_instances(instances, **kwargs) + return cls.from_instances(instances, tokenizer=tokenizer, vocab_file=vocab_file, **kwargs) @classmethod - def from_dureader_checklist(cls, input_files, **kwargs) -> tf.data.Dataset: + def from_dureader_checklist(cls, input_files, tokenizer=None, vocab_file=None, **kwargs) -> tf.data.Dataset: instances = read_dureader_checklist(input_files, **kwargs) - return cls.from_instances(instances, **kwargs) + return cls.from_instances(instances, tokenizer=tokenizer, vocab_file=vocab_file, **kwargs) + + @classmethod + def from_jsonl_files(cls, input_files, tokenizer=None, vocab_file=None, **kwargs) -> tf.data.Dataset: + instances = read_jsonl_files(input_files, **kwargs) + return cls.from_instances(instances, tokenizer=tokenizer, vocab_file=vocab_file, **kwargs) @classmethod def from_instances( cls, instances: List[Dict], tokenizer: BertWordPieceTokenizer = None, vocab_file=None, **kwargs ) -> tf.data.Dataset: + """Build tf.data.Dataset from json instances. + + Args: + instances: List instance of dict, each instance contains keys `context`, `question`, `answer` and `id` + tokenizer: Tokenizer used to tokenize text + vocab_file: The vocab path to build tokenizer. The `tokenizer` or `vocab_file` must be provided! + + Returns: + Instance of tf.data.Dataset, can be used to fit to tf.keras.Model directly. + """ examples = [] parser = ParserForQuestionAnswering(tokenizer=tokenizer, vocab_file=vocab_file, **kwargs) for instance in instances: @@ -115,16 +129,32 @@ def from_dataset(cls, dataset: DatasetForQuestionAnswering, **kwargs) -> tf.data @classmethod def from_examples(cls, examples: List[ExampleForQuestionAnswering], **kwargs) -> tf.data.Dataset: d = cls(**kwargs) - dataset = d._zip_dataset(examples, **kwargs) + + def _to_dataset(x, dtype=tf.int32): + x = tf.ragged.constant(x, dtype=dtype) + d = tf.data.Dataset.from_tensor_slices(x) + d = d.map(lambda x: x) + return d + + dataset = tf.data.Dataset.zip( + ( + _to_dataset(x=[e.input_ids for e in examples], dtype=tf.int32), + _to_dataset(x=[e.segment_ids for e in examples], dtype=tf.int32), + _to_dataset(x=[e.attention_mask for e in examples], dtype=tf.int32), + _to_dataset(x=[e.start for e in examples], dtype=tf.int32), + _to_dataset(x=[e.end for e in examples], dtype=tf.int32), + ) + ) + return d(dataset, **kwargs) def _filter(self, dataset: tf.data.Dataset, max_sequence_length=512, **kwargs) -> tf.data.Dataset: dataset = dataset.filter(lambda a, b, c, x, y: tf.size(a) <= max_sequence_length) return dataset - def _to_dict(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset: + def _to_dict(self, dataset: tf.data.Dataset, start_key="start", end_key="end", **kwargs) -> tf.data.Dataset: dataset = dataset.map( - lambda a, b, c, x, y: ({"input_ids": a, "segment_ids": b, "attention_mask": c}, {"head": x, "tail": y}), + lambda a, b, c, x, y: ({"input_ids": a, "segment_ids": b, "attention_mask": c}, {start_key: x, end_key: y}), num_parallel_calls=kwargs.get("num_parallel_calls", utils.AUTOTUNE), ).prefetch(kwargs.get("buffer_size", utils.AUTOTUNE)) return dataset @@ -162,23 +192,3 @@ def _bucket_padding(self, dataset: tf.data.Dataset, **kwargs) -> tf.data.Dataset **kwargs, ) return dataset - - def _zip_dataset(self, examples: List[ExampleForQuestionAnswering], **kwargs) -> tf.data.Dataset: - """Zip examples to tf.data.Dataset""" - - def _to_dataset(x, dtype=tf.int32): - x = tf.ragged.constant(x, dtype=dtype) - d = tf.data.Dataset.from_tensor_slices(x) - d = d.map(lambda x: x) - return d - - dataset = tf.data.Dataset.zip( - ( - _to_dataset(x=[e.input_ids for e in examples], dtype=tf.int32), - _to_dataset(x=[e.segment_ids for e in examples], dtype=tf.int32), - _to_dataset(x=[e.attention_mask for e in examples], dtype=tf.int32), - _to_dataset(x=[e.start for e in examples], dtype=tf.int32), - _to_dataset(x=[e.end for e in examples], dtype=tf.int32), - ) - ) - return dataset diff --git a/datasets/qa/readers.py b/datasets/qa/readers.py index 7dfabb8..529023e 100644 --- a/datasets/qa/readers.py +++ b/datasets/qa/readers.py @@ -1,8 +1,4 @@ -import abc import json -import logging -import os -from typing import Dict, List def read_dureader_rubost(input_files, **kwargs): @@ -42,3 +38,23 @@ def read_dureader_checklist(input_files, **kwargs): answer = qa["answers"][0]["text"] instance = {"context": title + context, "question": question, "answer": answer, "id": qa["id"]} yield instance + + +def read_jsonl_files(input_files, context_key="context", question_key="question", answers_key="answer", id_key="id", **kwargs): + if isinstance(input_files, str): + input_files = [input_files] + _id = 0 + for input_file in input_files: + with open(input_file, mode="rt", encoding="utf-8") as fin: + for line in fin: + data = json.loads(line) + answers = data[answers_key] + if not answers: + continue + if not isinstance(answers, list): + answers = [answers] + answer = answers[0] + instance_id = data.get(id_key, _id) + _id += 1 + instance = {"context": data[context_key], "question": data[question_key], "answer": answer, "id": instance_id} + yield instance