diff --git a/RELEASE.md b/RELEASE.md index 1a369906..ab7ec2dd 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -9,6 +9,7 @@ ## Known Issues ## Breaking Changes +* Moves some non-public arrow_util functions to TFX-BSL. ## Deprecations diff --git a/tensorflow_data_validation/arrow/arrow_util.py b/tensorflow_data_validation/arrow/arrow_util.py index 4e3cc226..29e717ae 100644 --- a/tensorflow_data_validation/arrow/arrow_util.py +++ b/tensorflow_data_validation/arrow/arrow_util.py @@ -13,7 +13,7 @@ # limitations under the License """Util functions regarding to Arrow objects.""" -from typing import Callable, Dict, Iterable, Optional, Text, Tuple, Union +from typing import Callable, Dict, Iterable, Optional, Text, Tuple import numpy as np import pyarrow as pa @@ -82,175 +82,6 @@ def is_binary_like(data_type: pa.DataType) -> bool: pa.types.is_large_unicode(data_type)) -def is_list_like(data_type: pa.DataType) -> bool: - """Returns true if an Arrow type is list-like.""" - return pa.types.is_list(data_type) or pa.types.is_large_list(data_type) - - -def get_field(struct_array: pa.StructArray, field: Union[str, int]) -> pa.Array: - """Returns struct_array.field(field) with null propagation. - - This function is equivalent to struct_array.field() but correctly handles - null propagation (the parent struct's null values are propagated to children). - - Args: - struct_array: A struct array which should be queried. - field: The request field to retrieve. - - Returns: - A pa.Array containing the requested field. - - Raises: - KeyError: If field is not a child field in struct_array. - """ - child_array = struct_array.field(field) - - # In case all values are present then there's no need for special handling. - # We can return child_array as is to avoid a performance penalty caused by - # constructing and flattening the returned array. - if struct_array.null_count == 0: - return child_array - # is_valid returns a BooleanArray with two buffers the buffer at offset - # 0 is always None and buffer 1 contains the data on which fields are - # valid/not valid. - # (https://arrow.apache.org/docs/format/Columnar.html#buffer-listing-for-each-layout) - validity_bitmap_buffer = struct_array.is_valid().buffers()[1] - - # Construct a new struct array with a single field. Calling flatten() on the - # new array guarantees validity bitmaps are merged correctly. - new_type = pa.struct([pa.field(field, child_array.type)]) - if (child_array.null_count == 0 and child_array.offset != 0): - # TODO(caveness): Remove this special handling once flattening a struct that - # has children that were sliced produces arrays with a correct validity - # bitmap. - child_array = pa.concat_arrays([pa.nulls(0, child_array.type), child_array]) - filtered_struct = pa.StructArray.from_buffers( - new_type, - len(struct_array), [validity_bitmap_buffer], - null_count=struct_array.null_count, - children=[child_array]) - return filtered_struct.flatten()[0] - - -def get_array( - record_batch: pa.RecordBatch, - query_path: types.FeaturePath, - return_example_indices: bool, - wrap_flat_struct_in_list: bool = True, -) -> Tuple[pa.Array, Optional[np.ndarray]]: - """Retrieve a nested array (and optionally example indices) from RecordBatch. - - This function has the same assumption over `record_batch` as - `enumerate_arrays()` does. - - If the provided path refers to a leaf in the `record_batch`, then a - "nested_list" will be returned. If the provided path does not refer to a leaf, - a "struct" will be returned. - - See `enumerate_arrays()` for definition of "nested_list" and "struct". - - Args: - record_batch: The RecordBatch whose arrays to be visited. - query_path: The FeaturePath to lookup in the record_batch. - return_example_indices: Whether to return an additional array containing the - example indices of the elements in the array corresponding to the - query_path. - wrap_flat_struct_in_list: if True, and if the query_path leads to a - struct<[Ts]> array, it will be wrapped in a list array, where each - sub-list contains one element. Caller can make use of this option to - assume this function always returns a list. - - Returns: - A tuple. The first term is the feature array and the second term is the - example_indices array for the feature array (i.e. array[i] came from the - example at row example_indices[i] in the record_batch.). - - Raises: - KeyError: When the query_path is empty, or cannot be found in the - record_batch and its nested struct arrays. - """ - - def _recursion_helper( - query_path: types.FeaturePath, array: pa.Array, - example_indices: Optional[np.ndarray] - ) -> Tuple[pa.Array, Optional[np.ndarray]]: - """Recursion helper.""" - array_type = array.type - if not query_path: - if pa.types.is_struct(array_type) and wrap_flat_struct_in_list: - array = array_util.ToSingletonListArray(array) - return array, example_indices - if not pa.types.is_struct(get_innermost_nested_type(array_type)): - raise KeyError('Cannot process query_path "{}" inside an array of type ' - '{}. Expecting a struct<...> or ' - '(large_)list...>.'.format( - query_path, array_type)) - flat_struct_array, parent_indices = flatten_nested( - array, example_indices is not None) - flat_indices = None - if example_indices is not None: - flat_indices = example_indices[parent_indices] - - step = query_path.steps()[0] - - try: - child_array = get_field(flat_struct_array, step) - except KeyError: - raise KeyError(f'query_path step "{step}" not in struct.') # pylint: disable=raise-missing-from - - relative_path = types.FeaturePath(query_path.steps()[1:]) - return _recursion_helper(relative_path, child_array, flat_indices) - - if not query_path: - raise KeyError('query_path must be non-empty.') - column_name = query_path.steps()[0] - field_index = record_batch.schema.get_field_index(column_name) - if field_index < 0: - raise KeyError('query_path step 0 "{}" not in record batch.' - .format(column_name)) - array = record_batch.column(field_index) - array_path = types.FeaturePath(query_path.steps()[1:]) - - example_indices = np.arange( - record_batch.num_rows) if return_example_indices else None - return _recursion_helper(array_path, array, example_indices) - - -def flatten_nested( - array: pa.Array, return_parent_indices: bool = False - ) -> Tuple[pa.Array, Optional[np.ndarray]]: - """Flattens all the list arrays nesting an array. - - If `array` is not list-like, itself will be returned. - - Args: - array: pa.Array to flatten. - return_parent_indices: If True, also returns the parent indices array. - - Returns: - A tuple. The first term is the flattened array. The second term is None - if `return_parent_indices` is False; otherwise it's a parent indices array - parallel to the flattened array: if parent_indices[i] = j, then - flattened_array[i] belongs to the j-th element of the input array. - """ - parent_indices = None - - while is_list_like(array.type): - if return_parent_indices: - cur_parent_indices = array_util.GetFlattenedArrayParentIndices( - array).to_numpy() - if parent_indices is None: - parent_indices = cur_parent_indices - else: - parent_indices = parent_indices[cur_parent_indices] - array = array.flatten() - - # the array is not nested at the first place. - if return_parent_indices and parent_indices is None: - parent_indices = np.arange(len(array)) - return array, parent_indices - - def enumerate_arrays( record_batch: pa.RecordBatch, example_weight_map: Optional[ExampleWeightMap], @@ -320,7 +151,7 @@ def _recursion_helper( ) -> Iterable[Tuple[types.FeaturePath, pa.Array, Optional[np.ndarray]]]: """Recursion helper.""" array_type = array.type - innermost_nested_type = get_innermost_nested_type(array_type) + innermost_nested_type = array_util.get_innermost_nested_type(array_type) if pa.types.is_struct(innermost_nested_type): if not enumerate_leaves_only: weights = all_weights.get(example_weight_map.get(feature_path)) @@ -331,7 +162,7 @@ def _recursion_helper( if pa.types.is_struct(array_type) and wrap_flat_struct_in_list: to_yield = array_util.ToSingletonListArray(array) yield (feature_path, to_yield, weights) - flat_struct_array, parent_indices = flatten_nested( + flat_struct_array, parent_indices = array_util.flatten_nested( array, bool(all_weights)) # Potential optimization: # Only flatten weights that we know will be used in the recursion. @@ -343,7 +174,9 @@ def _recursion_helper( field_name = field.name yield from _recursion_helper( feature_path.child(field_name), - get_field(flat_struct_array, field_name), flat_all_weights) + array_util.get_field(flat_struct_array, field_name), + flat_all_weights, + ) else: weights = all_weights.get(example_weight_map.get(feature_path)) yield (feature_path, array, weights) @@ -364,13 +197,6 @@ def _recursion_helper( types.FeaturePath([column_name]), column, all_weights) -def get_innermost_nested_type(arrow_type: pa.DataType) -> pa.DataType: - """Returns the innermost type of a nested list type.""" - while is_list_like(arrow_type): - arrow_type = arrow_type.value_type - return arrow_type - - def get_nest_level(array_type: pa.DataType) -> int: """Returns the nest level of an array type. @@ -386,7 +212,7 @@ def get_nest_level(array_type: pa.DataType) -> int: the nest level. """ result = 0 - while is_list_like(array_type): + while array_util.is_list_like(array_type): result += 1 array_type = array_type.value_type diff --git a/tensorflow_data_validation/arrow/arrow_util_test.py b/tensorflow_data_validation/arrow/arrow_util_test.py index ef4d3d67..410154bf 100644 --- a/tensorflow_data_validation/arrow/arrow_util_test.py +++ b/tensorflow_data_validation/arrow/arrow_util_test.py @@ -287,13 +287,6 @@ def _Normalize(array: pa.Array) -> pa.Array: class ArrowUtilTest(parameterized.TestCase): - def testIsListLike(self): - for t in (pa.list_(pa.int64()), pa.large_list(pa.int64())): - self.assertTrue(arrow_util.is_list_like(t)) - - for t in (pa.binary(), pa.int64(), pa.large_string()): - self.assertFalse(arrow_util.is_list_like(t)) - def testIsBinaryLike(self): for t in (pa.binary(), pa.large_binary(), pa.string(), pa.large_string()): self.assertTrue(arrow_util.is_binary_like(t)) @@ -340,108 +333,6 @@ def testGetWeightFeatureTooManyValues(self): pa.array([[1], [2, 2]])], ["v", "w"]), weight_column="w") - def testGetArrayEmptyPath(self): - with self.assertRaisesRegex( - KeyError, - r"query_path must be non-empty.*"): - arrow_util.get_array( - pa.RecordBatch.from_arrays([pa.array([[1], [2, 3]])], ["v"]), - query_path=types.FeaturePath([]), - return_example_indices=False) - - def testGetArrayColumnMissing(self): - with self.assertRaisesRegex( - KeyError, - r'query_path step 0 "x" not in record batch.*'): - arrow_util.get_array( - pa.RecordBatch.from_arrays([pa.array([[1], [2]])], ["y"]), - query_path=types.FeaturePath(["x"]), - return_example_indices=False) - - def testGetArrayStepMissing(self): - with self.assertRaisesRegex(KeyError, - r'query_path step "ssf3" not in struct.*'): - arrow_util.get_array( - _INPUT_RECORD_BATCH, - query_path=types.FeaturePath(["f2", "sf2", "ssf3"]), - return_example_indices=False) - - def testGetArrayReturnExampleIndices(self): - record_batch = pa.RecordBatch.from_arrays([ - pa.array([[{ - "sf": [{ - "ssf": [1] - }, { - "ssf": [2] - }] - }], [{ - "sf": [{ - "ssf": [3, 4] - }] - }]]), - pa.array([["one"], ["two"]]) - ], ["f", "w"]) - feature = types.FeaturePath(["f", "sf", "ssf"]) - actual_arr, actual_indices = arrow_util.get_array( - record_batch, feature, return_example_indices=True) - expected_arr = pa.array([[1], [2], [3, 4]]) - expected_indices = np.array([0, 0, 1]) - self.assertTrue( - actual_arr.equals(expected_arr), - "\nfeature: {};\nexpected:\n{};\nactual:\n{}".format( - feature, expected_arr, actual_arr)) - np.testing.assert_array_equal(expected_indices, actual_indices) - - def testGetArraySubpathMissing(self): - with self.assertRaisesRegex( - KeyError, - r'Cannot process .* "sssf" inside .* list.*'): - arrow_util.get_array( - _INPUT_RECORD_BATCH, - query_path=types.FeaturePath(["f2", "sf2", "ssf1", "sssf"]), - return_example_indices=False) - - @parameterized.named_parameters( - ((str(f), f, expected) for (f, expected) in _FEATURES_TO_ARRAYS.items())) - def testGetArray(self, feature, expected): - actual_arr, actual_indices = arrow_util.get_array( - _INPUT_RECORD_BATCH, feature, return_example_indices=True, - wrap_flat_struct_in_list=False) - expected_arr, expected_indices, _ = expected - self.assertTrue( - actual_arr.equals(expected_arr), - "\nfeature: {};\nexpected:\n{};\nactual:\n{}".format( - feature, expected_arr, actual_arr)) - np.testing.assert_array_equal(expected_indices, actual_indices) - - @parameterized.named_parameters( - ((str(f), f, expected) for (f, expected) in _FEATURES_TO_ARRAYS.items())) - def testGetArrayNoBroadcast(self, feature, expected): - actual_arr, actual_indices = arrow_util.get_array( - _INPUT_RECORD_BATCH, feature, return_example_indices=False, - wrap_flat_struct_in_list=False) - expected_arr, _, _ = expected - self.assertTrue( - actual_arr.equals(expected_arr), - "\nfeature: {};\nexpected:\n{};\nactual:\n{}".format( - feature, expected_arr, actual_arr)) - self.assertIsNone(actual_indices) - - @parameterized.named_parameters( - ((str(f), f, expected) for (f, expected) in _FEATURES_TO_ARRAYS.items())) - def testGetArrayWrapFlatStructArray(self, feature, expected): - actual_arr, actual_indices = arrow_util.get_array( - _INPUT_RECORD_BATCH, feature, return_example_indices=True, - wrap_flat_struct_in_list=True) - expected_arr, expected_indices, _ = expected - if pa.types.is_struct(expected_arr.type): - expected_arr = array_util.ToSingletonListArray(expected_arr) - self.assertTrue( - actual_arr.equals(expected_arr), - "\nfeature: {};\nexpected:\n{};\nactual:\n{}".format( - feature, expected_arr, actual_arr)) - np.testing.assert_array_equal(expected_indices, actual_indices) - def testEnumerateArraysStringWeight(self): # The arrow type of a string changes between py2 and py3 so we accept either with self.assertRaisesRegex( @@ -551,27 +442,6 @@ def testEnumerateMissingPropagatedInFlattenedStruct(self, batch, "feature={}; expected: {}; actual: {}; diff: {}".format( k, v, actual, actual.diff(v))) - def testFlattenNested(self): - input_array = pa.array([[[1, 2]], None, [None, [3]]]) - flattened, parent_indices = arrow_util.flatten_nested( - input_array, return_parent_indices=False) - expected = pa.array([1, 2, 3]) - expected_parent_indices = [0, 0, 2] - self.assertIs(parent_indices, None) - self.assertTrue(flattened.equals(expected)) - - flattened, parent_indices = arrow_util.flatten_nested( - input_array, return_parent_indices=True) - self.assertTrue(flattened.equals(expected)) - np.testing.assert_array_equal(parent_indices, expected_parent_indices) - - def testFlattenNestedNonList(self): - input_array = pa.array([1, 2]) - flattened, parent_indices = arrow_util.flatten_nested( - input_array, return_parent_indices=True) - self.assertTrue(flattened.equals(pa.array([1, 2]))) - np.testing.assert_array_equal(parent_indices, [0, 1]) - def testGetColumn(self): self.assertTrue( arrow_util.get_column(_INPUT_RECORD_BATCH, diff --git a/tensorflow_data_validation/arrow/decoded_examples_to_arrow.py b/tensorflow_data_validation/arrow/decoded_examples_to_arrow.py index 408e4694..9a739484 100644 --- a/tensorflow_data_validation/arrow/decoded_examples_to_arrow.py +++ b/tensorflow_data_validation/arrow/decoded_examples_to_arrow.py @@ -24,6 +24,7 @@ import six from tensorflow_data_validation import types from tensorflow_data_validation.arrow import arrow_util +from tfx_bsl.arrow import array_util def DecodedExamplesToRecordBatch( @@ -66,7 +67,7 @@ def DecodedExamplesToRecordBatch( for name, array in six.moves.zip(field_names, value_arrays): if pa.types.is_null(array.type): continue - if not arrow_util.is_list_like(array.type): + if not array_util.is_list_like(array.type): raise TypeError("Expected list arrays for field {} but got {}".format( name, array.type)) value_type = array.type.value_type diff --git a/tensorflow_data_validation/statistics/generators/basic_stats_generator.py b/tensorflow_data_validation/statistics/generators/basic_stats_generator.py index 8d07d7b0..4a97e60d 100644 --- a/tensorflow_data_validation/statistics/generators/basic_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/basic_stats_generator.py @@ -224,7 +224,7 @@ def update(self, return level = 0 - while arrow_util.is_list_like(feature_array.type): + while array_util.is_list_like(feature_array.type): presence_mask = ~np.asarray( array_util.GetArrayNullBitmapAsByteArray(feature_array)).view(bool) num_values = np.asarray( @@ -320,7 +320,7 @@ def update( if not feature_array: return - flattened_value_array, value_parent_indices = arrow_util.flatten_nested( + flattened_value_array, value_parent_indices = array_util.flatten_nested( feature_array, weights is not None) # Note: to_numpy will fail if flattened_value_array is empty. if not flattened_value_array: @@ -388,7 +388,7 @@ def update(self, feature_array: pa.Array) -> None: if pa.types.is_null(feature_array.type): return # Iterate through the value array and update the partial stats. - flattened_values_array, _ = arrow_util.flatten_nested(feature_array) + flattened_values_array, _ = array_util.flatten_nested(feature_array) if arrow_util.is_binary_like(flattened_values_array.type): # GetBinaryArrayTotalByteSize returns a Python long (to be compatible # with Python3). To make sure we do cheaper integer arithemetics in @@ -432,7 +432,7 @@ def update(self, feature_array: pa.Array) -> None: if pa.types.is_null(feature_array.type): return # Iterate through the value array and update the partial stats.' - flattened_values_array, _ = arrow_util.flatten_nested(feature_array) + flattened_values_array, _ = array_util.flatten_nested(feature_array) if (pa.types.is_floating(flattened_values_array.type) or pa.types.is_integer(flattened_values_array.type)): raise ValueError('Bytes stats cannot be computed on INT/FLOAT features.') diff --git a/tensorflow_data_validation/statistics/generators/cross_feature_stats_generator.py b/tensorflow_data_validation/statistics/generators/cross_feature_stats_generator.py index db9ac22e..7935a5e6 100644 --- a/tensorflow_data_validation/statistics/generators/cross_feature_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/cross_feature_stats_generator.py @@ -33,7 +33,6 @@ from pandas import DataFrame, Series # pylint: disable=g-multiple-import import pyarrow as pa from tensorflow_data_validation import types -from tensorflow_data_validation.arrow import arrow_util from tensorflow_data_validation.statistics.generators import stats_generator from tensorflow_data_validation.utils import stats_util from tfx_bsl.arrow import array_util @@ -133,7 +132,7 @@ def _get_univalent_values_with_parent_indices( # If there are no univalent values, continue to the next feature. if not univalent_parent_indices: continue - flattened, value_parent_indices = arrow_util.flatten_nested( + flattened, value_parent_indices = array_util.flatten_nested( feat_arr, True) non_missing_values = np.asarray(flattened) if feature_type == statistics_pb2.FeatureNameStatistics.FLOAT: diff --git a/tensorflow_data_validation/statistics/generators/image_stats_generator.py b/tensorflow_data_validation/statistics/generators/image_stats_generator.py index eccd6f2f..ea0c710e 100644 --- a/tensorflow_data_validation/statistics/generators/image_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/image_stats_generator.py @@ -39,9 +39,9 @@ import tensorflow as tf from tensorflow_data_validation import types -from tensorflow_data_validation.arrow import arrow_util from tensorflow_data_validation.statistics.generators import stats_generator from tensorflow_data_validation.utils import stats_util +from tfx_bsl.arrow import array_util from tensorflow_metadata.proto.v0 import statistics_pb2 _DOMAIN_INFO = 'domain_info' @@ -278,7 +278,7 @@ def add_input(self, accumulator: _PartialImageStats, # Consider using memoryview to avoid copying after upgrading to # arrow 0.12. Note that this would involve modifying the subsequent logic # to iterate over the values in a loop. - values = np.asarray(arrow_util.flatten_nested(feature_array)[0]) + values = np.asarray(array_util.flatten_nested(feature_array)[0]) accumulator.total_num_values += values.size image_formats = self._image_decoder.get_formats(values) valid_mask = ~pd.isnull(image_formats) diff --git a/tensorflow_data_validation/statistics/generators/input_batch.py b/tensorflow_data_validation/statistics/generators/input_batch.py index ed09ef34..ab80d777 100644 --- a/tensorflow_data_validation/statistics/generators/input_batch.py +++ b/tensorflow_data_validation/statistics/generators/input_batch.py @@ -25,8 +25,9 @@ import pyarrow as pa from tensorflow_data_validation import types -from tensorflow_data_validation.arrow import arrow_util from tfx_bsl.arrow import array_util +from tfx_bsl.arrow import path as tfx_bsl_path +from tfx_bsl.arrow import table_util class InputBatch(object): @@ -58,8 +59,11 @@ def null_mask(self, path: types.FeaturePath) -> np.ndarray: mask. """ try: - array, _ = arrow_util.get_array( - self._record_batch, path, return_example_indices=False) + array, _ = table_util.get_array( + self._record_batch, + tfx_bsl_path.ColumnPath(path.steps()), + return_example_indices=False, + ) # GetArrayNullBitmapAsByteArray is only useful for non-null type arrays. if pa.types.is_null(array.type): return np.full(self._record_batch.num_rows, True) @@ -118,11 +122,14 @@ def list_lengths(self, path: types.FeaturePath) -> np.ndarray: if key in self._cache: return self._cache[key] try: - array, _ = arrow_util.get_array( - self._record_batch, path, return_example_indices=False) + array, _ = table_util.get_array( + self._record_batch, + tfx_bsl_path.ColumnPath(path.steps()), + return_example_indices=False, + ) if pa.types.is_null(array.type): lengths = np.full(self._record_batch.num_rows, 0) - elif not arrow_util.is_list_like(array.type): + elif not array_util.is_list_like(array.type): raise ValueError('Can only compute list lengths on list arrays, found ' '{}'.format(array.type)) else: diff --git a/tensorflow_data_validation/statistics/generators/lift_stats_generator.py b/tensorflow_data_validation/statistics/generators/lift_stats_generator.py index bb5997c5..442338c9 100644 --- a/tensorflow_data_validation/statistics/generators/lift_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/lift_stats_generator.py @@ -32,6 +32,9 @@ from tensorflow_data_validation.utils import schema_util from tensorflow_data_validation.utils import stats_util from tensorflow_data_validation.utils.example_weight_map import ExampleWeightMap +from tfx_bsl.arrow import array_util +from tfx_bsl.arrow import path as tfx_bsl_path +from tfx_bsl.arrow import table_util from tensorflow_metadata.proto.v0 import schema_pb2 from tensorflow_metadata.proto.v0 import statistics_pb2 @@ -138,12 +141,15 @@ def _get_example_value_presence( A _ValuePresence tuple which contains three numpy arrays: example indices, values, and weights. """ - arr, example_indices = arrow_util.get_array( - record_batch, path, return_example_indices=True) + arr, example_indices = table_util.get_array( + record_batch, + tfx_bsl_path.ColumnPath(path.steps()), + return_example_indices=True, + ) if stats_util.get_feature_type_from_arrow_type(path, arr.type) is None: return - arr_flat, parent_indices = arrow_util.flatten_nested( + arr_flat, parent_indices = array_util.flatten_nested( arr, return_parent_indices=True) is_binary_like = arrow_util.is_binary_like(arr_flat.type) assert boundaries is None or not is_binary_like, ( diff --git a/tensorflow_data_validation/statistics/generators/mutual_information.py b/tensorflow_data_validation/statistics/generators/mutual_information.py index 95645863..52669454 100644 --- a/tensorflow_data_validation/statistics/generators/mutual_information.py +++ b/tensorflow_data_validation/statistics/generators/mutual_information.py @@ -22,12 +22,12 @@ import pandas as pd import pyarrow as pa from tensorflow_data_validation import types -from tensorflow_data_validation.arrow import arrow_util from tensorflow_data_validation.statistics.generators import partitioned_stats_generator from tensorflow_data_validation.utils import feature_partition_util from tensorflow_data_validation.utils import mutual_information_util from tensorflow_data_validation.utils import schema_util from tensorflow_data_validation.utils import stats_util +from tfx_bsl.arrow import array_util from tensorflow_metadata.proto.v0 import schema_pb2 from tensorflow_metadata.proto.v0 import statistics_pb2 @@ -46,7 +46,7 @@ def _get_flattened_feature_values_without_nulls( Returns: A list containing the flattened feature values with nulls removed. """ - non_missing_values = np.asarray(arrow_util.flatten_nested(feature_array)[0]) + non_missing_values = np.asarray(array_util.flatten_nested(feature_array)[0]) return list(non_missing_values[~pd.isnull(non_missing_values)]) @@ -98,7 +98,7 @@ def _apply_categorical_encoding_to_feature_array( if pa.types.is_null(feature_array.type): return [] result = [None for _ in range(len(feature_array))] - flattened, non_missing_parent_indices = arrow_util.flatten_nested( + flattened, non_missing_parent_indices = array_util.flatten_nested( feature_array, True) non_missing_values = flattened.to_pylist() non_missing_parent_indices = list(non_missing_parent_indices) @@ -177,7 +177,7 @@ def _apply_numerical_encoding_to_feature_array( if pa.types.is_null(feature_array.type): return [] result = [None for _ in range(len(feature_array))] # type: List - flattened, non_missing_parent_indices = arrow_util.flatten_nested( + flattened, non_missing_parent_indices = array_util.flatten_nested( feature_array, True) assert non_missing_parent_indices is not None non_missing_values = np.asarray(flattened) @@ -251,7 +251,7 @@ def _encode_univalent_feature(feature_array: pa.Array) -> List[Any]: A list containing the feature values where null values are replaced by None. """ result = [[None] for _ in range(len(feature_array))] - flattened, non_missing_parent_indices = arrow_util.flatten_nested( + flattened, non_missing_parent_indices = array_util.flatten_nested( feature_array, True) non_missing_values = np.asarray(flattened) nan_mask = pd.isnull(non_missing_values) diff --git a/tensorflow_data_validation/statistics/generators/natural_language_domain_inferring_stats_generator.py b/tensorflow_data_validation/statistics/generators/natural_language_domain_inferring_stats_generator.py index ac5ea467..108b8cf9 100644 --- a/tensorflow_data_validation/statistics/generators/natural_language_domain_inferring_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/natural_language_domain_inferring_stats_generator.py @@ -33,9 +33,9 @@ import pyarrow as pa import six from tensorflow_data_validation import types -from tensorflow_data_validation.arrow import arrow_util from tensorflow_data_validation.statistics.generators import stats_generator from tensorflow_data_validation.utils import stats_util +from tfx_bsl.arrow import array_util from typing import Iterable, Text from tensorflow_metadata.proto.v0 import statistics_pb2 @@ -204,7 +204,7 @@ def _is_non_utf8(value): is_non_utf_vec = np.vectorize(_is_non_utf8, otypes=[bool]) classify_vec = np.vectorize(self._classifier.classify, otypes=[bool]) - values = np.asarray(arrow_util.flatten_nested(feature_array)[0] + values = np.asarray(array_util.flatten_nested(feature_array)[0] .slice(0, _CROP_AT_VALUES)) if np.any(is_non_utf_vec(values)): accumulator.invalidate = True diff --git a/tensorflow_data_validation/statistics/generators/sklearn_mutual_information.py b/tensorflow_data_validation/statistics/generators/sklearn_mutual_information.py index 01285ebe..0336e3d0 100644 --- a/tensorflow_data_validation/statistics/generators/sklearn_mutual_information.py +++ b/tensorflow_data_validation/statistics/generators/sklearn_mutual_information.py @@ -88,7 +88,7 @@ def _flatten_and_impute(examples: pa.RecordBatch, else: # to_pandas returns a readonly array. Create a copy as we will be imputing # the NaN values. - flattened_array, non_missing_parent_indices = arrow_util.flatten_nested( + flattened_array, non_missing_parent_indices = array_util.flatten_nested( feature_array, return_parent_indices=True) assert non_missing_parent_indices is not None non_missing_values = np.copy(np.asarray(flattened_array)) diff --git a/tensorflow_data_validation/statistics/generators/time_stats_generator.py b/tensorflow_data_validation/statistics/generators/time_stats_generator.py index f796e7f4..6a0d98ee 100644 --- a/tensorflow_data_validation/statistics/generators/time_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/time_stats_generator.py @@ -43,9 +43,9 @@ import pyarrow as pa from tensorflow_data_validation import types -from tensorflow_data_validation.arrow import arrow_util from tensorflow_data_validation.statistics.generators import stats_generator from tensorflow_data_validation.utils import stats_util +from tfx_bsl.arrow import array_util from typing import Iterable, Pattern, Text, Tuple from tensorflow_metadata.proto.v0 import schema_pb2 @@ -327,14 +327,14 @@ def add_input(self, accumulator: _PartialTimeStats, def _maybe_get_utf8(val): return stats_util.maybe_get_utf8(val) if isinstance(val, bytes) else val - values = np.asarray(arrow_util.flatten_nested(feature_array)[0]) + values = np.asarray(array_util.flatten_nested(feature_array)[0]) maybe_utf8 = np.vectorize(_maybe_get_utf8, otypes=[object])(values) if not maybe_utf8.all(): accumulator.invalidated = True return accumulator accumulator.update(maybe_utf8, feature_type) elif feature_type == statistics_pb2.FeatureNameStatistics.INT: - values = np.asarray(arrow_util.flatten_nested(feature_array)[0]) + values = np.asarray(array_util.flatten_nested(feature_array)[0]) accumulator.update(values, feature_type) else: accumulator.invalidated = True diff --git a/tensorflow_data_validation/statistics/generators/top_k_uniques_sketch_stats_generator.py b/tensorflow_data_validation/statistics/generators/top_k_uniques_sketch_stats_generator.py index 099dfc70..a01c16d0 100644 --- a/tensorflow_data_validation/statistics/generators/top_k_uniques_sketch_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/top_k_uniques_sketch_stats_generator.py @@ -33,6 +33,7 @@ from tensorflow_data_validation.utils import top_k_uniques_stats_util from tensorflow_data_validation.utils.example_weight_map import ExampleWeightMap +from tfx_bsl.arrow import array_util from tfx_bsl.sketches import KmvSketch from tfx_bsl.sketches import MisraGriesSketch @@ -165,7 +166,7 @@ def _update_combined_sketch_for_feature( weights: Optional[np.ndarray], accumulator: Dict[tfdv_types.FeaturePath, _CombinedSketch]): """Updates combined sketch with values (and weights if provided).""" - flattened_values, parent_indices = arrow_util.flatten_nested( + flattened_values, parent_indices = array_util.flatten_nested( values, weights is not None) combined_sketch = accumulator.get(feature_name, None) @@ -231,7 +232,7 @@ def update_length_counters( if np.random.random() > self._length_counter_sampling_rate: return if feature_type == statistics_pb2.FeatureNameStatistics.STRING: distinct_count = collections.defaultdict(int) - values, _ = arrow_util.flatten_nested(leaf_array) + values, _ = array_util.flatten_nested(leaf_array) for value in values: binary_scalar_len = int(np.log2(max(value.as_buffer().size, 1))) distinct_count[binary_scalar_len] += 1 diff --git a/tensorflow_data_validation/statistics/generators/top_k_uniques_stats_generator.py b/tensorflow_data_validation/statistics/generators/top_k_uniques_stats_generator.py index 7b4093e3..c9703f27 100644 --- a/tensorflow_data_validation/statistics/generators/top_k_uniques_stats_generator.py +++ b/tensorflow_data_validation/statistics/generators/top_k_uniques_stats_generator.py @@ -30,6 +30,7 @@ from tensorflow_data_validation.utils import stats_util from tensorflow_data_validation.utils import top_k_uniques_stats_util from tensorflow_data_validation.utils.example_weight_map import ExampleWeightMap +from tfx_bsl.arrow import array_util from tensorflow_metadata.proto.v0 import schema_pb2 from tensorflow_metadata.proto.v0 import statistics_pb2 @@ -105,7 +106,7 @@ def _to_topk_tuples( continue if not _should_run(categorical_numeric_types, feature_path, feature_type): continue - flattened_values, parent_indices = arrow_util.flatten_nested( + flattened_values, parent_indices = array_util.flatten_nested( feature_array, weights is not None) if weights is not None and flattened_values: # Slow path: weighted uniques. diff --git a/tensorflow_data_validation/utils/slicing_util.py b/tensorflow_data_validation/utils/slicing_util.py index 227d470c..14071f64 100644 --- a/tensorflow_data_validation/utils/slicing_util.py +++ b/tensorflow_data_validation/utils/slicing_util.py @@ -35,6 +35,7 @@ from tensorflow_data_validation import types from tensorflow_data_validation.arrow import arrow_util from tensorflow_data_validation.utils import stats_util +from tfx_bsl.arrow import array_util from tfx_bsl.arrow import sql_util from tfx_bsl.arrow import table_util from tfx_bsl.public.proto import slicing_spec_pb2 @@ -149,7 +150,7 @@ def feature_value_slicer(record_batch: pa.RecordBatch) -> Iterable[ 'The feature to slice on has integer values but ' 'the provided slice values are not valid integers.') from e - flattened, value_parent_indices = arrow_util.flatten_nested( + flattened, value_parent_indices = array_util.flatten_nested( feature_array, True) non_missing_values = np.asarray(flattened) # Create dataframe with feature value and parent index. diff --git a/tensorflow_data_validation/utils/stats_util.py b/tensorflow_data_validation/utils/stats_util.py index e6bbe7dc..70d7040d 100644 --- a/tensorflow_data_validation/utils/stats_util.py +++ b/tensorflow_data_validation/utils/stats_util.py @@ -29,6 +29,7 @@ from tensorflow_data_validation.utils import artifacts_io_impl from tensorflow_data_validation.utils import io_util from tfx_bsl import statistics +from tfx_bsl.arrow import array_util from google.protobuf import text_format from tensorflow_metadata.proto.v0 import statistics_pb2 @@ -96,12 +97,12 @@ def get_feature_type_from_arrow_type( """ if pa.types.is_null(arrow_type): return None - if not arrow_util.is_list_like(arrow_type): + if not array_util.is_list_like(arrow_type): raise TypeError('Expected feature column to be a ' '(Large)List or null, but feature {} ' 'was {}.'.format(feature_path, arrow_type)) - value_type = arrow_util.get_innermost_nested_type(arrow_type) + value_type = array_util.get_innermost_nested_type(arrow_type) if pa.types.is_integer(value_type): return statistics_pb2.FeatureNameStatistics.INT elif pa.types.is_floating(value_type):