[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposes num_parallel_reads and num_parallel_calls #1232

Merged
merged 9 commits into from
Jan 7, 2021
147 changes: 126 additions & 21 deletions tensorflow_io/core/python/experimental/avro_record_dataset_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@
_DEFAULT_READER_SCHEMA = ""
# From https://github.com/tensorflow/tensorflow/blob/v2.0.0/tensorflow/python/data/ops/readers.py


def _require(condition: bool, err_msg: str = None) -> None:
"""Checks if the specified condition is true else raises exception
Args:
condition: The condition to test
err_msg: If specified, it's the error message to use if condition is not true.
Raises:
ValueError: Raised when the condition is false
Returns:
None
"""
if not condition:
raise ValueError(err_msg)


# copied from https://github.com/tensorflow/tensorflow/blob/
# 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L36
def _create_or_validate_filenames_dataset(filenames):
Expand Down Expand Up @@ -52,21 +70,62 @@ def _create_or_validate_filenames_dataset(filenames):

# copied from https://github.com/tensorflow/tensorflow/blob/
# 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L67
def _create_dataset_reader(dataset_creator, filenames, num_parallel_reads=None):
"""create_dataset_reader"""

def read_one_file(filename):
filename = tf.convert_to_tensor(filename, tf.string, name="filename")
return dataset_creator(filename)

if num_parallel_reads is None:
return filenames.flat_map(read_one_file)
if num_parallel_reads == tf.data.experimental.AUTOTUNE:
return filenames.interleave(
read_one_file, num_parallel_calls=num_parallel_reads
)
def _create_dataset_reader(
dataset_creator,
filenames,
cycle_length=None,
num_parallel_calls=None,
deterministic=None,
block_length=1,
):
"""
This creates a dataset reader which reads records from multiple files and interleaves them together
```
dataset = Dataset.range(1, 6) # ==> [ 1, 2, 3, 4, 5 ]
# NOTE: New lines indicate "block" boundaries.
dataset = dataset.interleave(
lambda x: Dataset.from_tensors(x).repeat(6),
cycle_length=2, block_length=4)
list(dataset.as_numpy_iterator())
```
Results in the following output:
[1,1,1,1,
2,2,2,2,
1,1,
2,2,
3,3,3,3,
4,4,4,4,
3,4,
5,5,5,5,
5,5,
]
Args:
dataset_creator: Initializer for AvroDatasetRecord
filenames: A `tf.data.Dataset` iterator of filenames to read
cycle_length: The number of files to be processed in parallel. This is used by `Dataset.Interleave`.
We set this equal to `block_length`, so that each time n number of records are returned for each of the n
files.
num_parallel_calls: Number of threads spawned by the interleave call.
deterministic: Sets whether the interleaved records are written in deterministic order. in tf.interleave this is default true
block_length: Sets the number of output on the output tensor. Defaults to 1
Returns:
A dataset iterator with an interleaved list of parsed avro records.
"""

def read_many_files(filenames):
filenames = tf.convert_to_tensor(filenames, tf.string, name="filename")
return dataset_creator(filenames)

if cycle_length is None:
return filenames.flat_map(read_many_files)

return filenames.interleave(
read_one_file, cycle_length=num_parallel_reads, block_length=1
read_many_files,
cycle_length=cycle_length,
num_parallel_calls=num_parallel_calls,
block_length=block_length,
deterministic=deterministic,
)


Expand Down Expand Up @@ -128,10 +187,16 @@ class AvroRecordDataset(tf.data.Dataset):
"""A `Dataset` comprising records from one or more AvroRecord files."""

def __init__(
self, filenames, buffer_size=None, num_parallel_reads=None, reader_schema=None
self,
filenames,
buffer_size=None,
num_parallel_reads=None,
num_parallel_calls=None,
reader_schema=None,
deterministic=True,
block_length=1,
):
"""Creates a `AvroRecordDataset` to read one or more AvroRecord files.
Args:
filenames: A `tf.string` tensor or `tf.data.Dataset` containing one or
more filenames.
Expand All @@ -144,25 +209,61 @@ def __init__(
files read in parallel are outputted in an interleaved order. If your
input pipeline is I/O bottlenecked, consider setting this parameter to a
value greater than one to parallelize the I/O. If `None`, files will be
read sequentially.
read sequentially. This must be set to equal or greater than `num_parallel_calls`.
This constraint exists because `num_parallel_reads` becomes `cycle_length` in the
underlying call to `tf.Dataset.Interleave`, and the `cycle_length` is required to be
equal or higher than the number of threads(`num_parallel_calls`).
`cycle_length` in tf.Dataset.Interleave will dictate how many items it will pick up to process
num_parallel_calls: (Optional.) number of thread to spawn. This must be set to `None`
or greater than 0. Also this must be less than or equal to `num_parallel_reads`. This defines
the degree of parallelism in the underlying Dataset.interleave call.
reader_schema: (Optional.) A `tf.string` scalar representing the reader
schema or None.
deterministic: (Optional.) A boolean controlling whether determinism should be traded for performance by
allowing elements to be produced out of order. Defaults to `True`
block_length: Sets the number of output on the output tensor. Defaults to 1
Raises:
TypeError: If any argument does not have the expected type.
ValueError: If any argument does not have the expected shape.
"""
_require(
num_parallel_calls is None
or num_parallel_calls == tf.data.experimental.AUTOTUNE
or num_parallel_calls > 0,
f"num_parallel_calls: {num_parallel_calls} must be set to None, "
f"tf.data.experimental.AUTOTUNE, or greater than 0",
)
if num_parallel_calls is not None:
_require(
num_parallel_reads is not None
and (
num_parallel_reads >= num_parallel_calls
or num_parallel_reads == tf.data.experimental.AUTOTUNE
),
f"num_parallel_reads: {num_parallel_reads} must be greater than or equal to "
f"num_parallel_calls: {num_parallel_calls} or set to tf.data.experimental.AUTOTUNE",
)

filenames = _create_or_validate_filenames_dataset(filenames)

self._filenames = filenames
self._buffer_size = buffer_size
self._num_parallel_reads = num_parallel_reads
self._num_parallel_calls = num_parallel_calls
self._reader_schema = reader_schema
self._block_length = block_length

def creator_fn(filename):
return _AvroRecordDataset(filename, buffer_size, reader_schema)
def read_multiple_files(filenames):
return _AvroRecordDataset(filenames, buffer_size, reader_schema)

self._impl = _create_dataset_reader(creator_fn, filenames, num_parallel_reads)
self._impl = _create_dataset_reader(
read_multiple_files,
filenames,
cycle_length=num_parallel_reads,
num_parallel_calls=num_parallel_calls,
deterministic=deterministic,
block_length=block_length,
)
variant_tensor = self._impl._variant_tensor # pylint: disable=protected-access
super().__init__(variant_tensor)

Expand All @@ -171,13 +272,17 @@ def _clone(
filenames=None,
buffer_size=None,
num_parallel_reads=None,
num_parallel_calls=None,
reader_schema=None,
block_length=None,
):
return AvroRecordDataset(
filenames or self._filenames,
buffer_size or self._buffer_size,
num_parallel_reads or self._num_parallel_reads,
num_parallel_calls or self._num_parallel_calls,
reader_schema or self._reader_schema,
block_length or self._block_length,
)

def _inputs(self):
Expand Down
39 changes: 6 additions & 33 deletions tensorflow_io/core/python/experimental/make_avro_record_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,60 +37,41 @@ def make_avro_record_dataset(
shuffle_seed=None,
prefetch_buffer_size=tf.data.experimental.AUTOTUNE,
num_parallel_reads=None,
num_parallel_parser_calls=None,
drop_final_batch=False,
):
"""Reads and (optionally) parses avro files into a dataset.
Provides common functionality such as batching, optional parsing, shuffling,
and performing defaults.
Args:
file_pattern: List of files or patterns of avro file paths.
See `tf.io.gfile.glob` for pattern rules.
features: A map of feature names mapped to feature information.
batch_size: An int representing the number of records to combine
in a single batch.
reader_schema: The reader schema.
reader_buffer_size: (Optional.) An int specifying the readers buffer
size in By. If None (the default) will use the default value from
AvroRecordDataset.
num_epochs: (Optional.) An int specifying the number of times this
dataset is repeated. If None (the default), cycles through the
dataset forever. If set to None drops final batch.
shuffle: (Optional.) A bool that indicates whether the input
should be shuffled. Defaults to `True`.
shuffle_buffer_size: (Optional.) Buffer size to use for
shuffling. A large buffer size ensures better shuffling, but
increases memory usage and startup time. If not provided
assumes default value of 10,000 records. Note that the shuffle
size is measured in records.
shuffle_seed: (Optional.) Randomization seed to use for shuffling.
By default uses a pseudo-random seed.
prefetch_buffer_size: (Optional.) An int specifying the number of
feature batches to prefetch for performance improvement.
Defaults to auto-tune. Set to 0 to disable prefetching.
num_parallel_reads: (Optional.) Number of threads used to read
records from files. By default or if set to a value >1, the
results will be interleaved.
num_parallel_parser_calls: (Optional.) Number of parallel
records to parse in parallel. Defaults to an automatic selection.
num_parallel_reads: (Optional.) Number of parallel
records to parse in parallel. Defaults to None(no parallelization).
drop_final_batch: (Optional.) Whether the last batch should be
dropped in case its size is smaller than `batch_size`; the
default behavior is not to drop the smaller batch.
Returns:
A dataset, where each element matches the output of `parser_fn`
except it will have an additional leading `batch-size` dimension,
Expand All @@ -99,20 +80,15 @@ def make_avro_record_dataset(
"""
files = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle, seed=shuffle_seed)

if num_parallel_reads is None:
# Note: We considered auto-tuning this value, but there is a concern
# that this affects the mixing of records from different files, which
# could affect training convergence/accuracy, so we are defaulting to
# a constant for now.
num_parallel_reads = 24

if reader_buffer_size is None:
reader_buffer_size = 1024 * 1024

num_parallel_calls = num_parallel_reads
dataset = AvroRecordDataset(
files,
buffer_size=reader_buffer_size,
num_parallel_reads=num_parallel_reads,
num_parallel_calls=num_parallel_calls,
block_length=num_parallel_calls,
reader_schema=reader_schema,
)

Expand All @@ -131,14 +107,11 @@ def make_avro_record_dataset(

dataset = dataset.batch(batch_size, drop_remainder=drop_final_batch)

if num_parallel_parser_calls is None:
num_parallel_parser_calls = tf.data.experimental.AUTOTUNE

dataset = dataset.map(
lambda data: parse_avro(
serialized=data, reader_schema=reader_schema, features=features
),
num_parallel_calls=num_parallel_parser_calls,
num_parallel_calls=num_parallel_calls,
)

if prefetch_buffer_size == 0:
Expand Down
59 changes: 58 additions & 1 deletion tests/test_parse_avro_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import tensorflow_io as tfio

if sys.platform == "darwin":
pytest.skip("TODO: skip macOS", allow_module_level=True)
pytest.skip("TODO: skip macOS", allow_module_level=True)


class AvroRecordsToFile:
Expand Down Expand Up @@ -246,6 +246,63 @@ def _load_records_as_tensors(filenames, schema):
),
)

def test_inval_num_parallel_calls(self):
"""test_inval_num_parallel_calls
This function tests that value errors are raised upon
the passing of invalid values for num_parallel_calls which
includes zero values and values greater than num_parallel_reads
"""

NUM_PARALLEL_READS = 1
NUM_PARALLEL_CALLS_ZERO = 0
NUM_PARALLEL_CALLS_GREATER = 2

writer_schema = """{
"type": "record",
"name": "dataTypes",
"fields": [
{
"name":"index",
"type":"int"
},
{
"name":"string_value",
"type":"string"
}
]}"""

record_data = [
{"index": 0, "string_value": ""},
{"index": 1, "string_value": "SpecialChars@!#$%^&*()-_=+{}[]|/`~\\'?"},
{
"index": 2,
"string_value": "ABCDEFGHIJKLMNOPQRSTUVW"
+ "Zabcdefghijklmnopqrstuvwz0123456789",
},
]

filenames = AvroRecordDatasetTest._setup_files(
writer_schema=writer_schema, records=record_data
)

with pytest.raises(ValueError):

dataset_a = tfio.experimental.columnar.AvroRecordDataset(
filenames=filenames,
num_parallel_reads=NUM_PARALLEL_READS,
num_parallel_calls=NUM_PARALLEL_CALLS_ZERO,
reader_schema="reader_schema",
)

with pytest.raises(ValueError):

dataset_b = tfio.experimental.columnar.AvroRecordDataset(
filenames=filenames,
num_parallel_reads=NUM_PARALLEL_READS,
num_parallel_calls=NUM_PARALLEL_CALLS_GREATER,
reader_schema="reader_schema",
)

def _test_pass_dataset(self, writer_schema, record_data, **kwargs):
"""test_pass_dataset"""
filenames = AvroRecordDatasetTest._setup_files(
Expand Down