[go: nahoru, domu]

Skip to content

Commit

Permalink
Automated rollback of commit fb2ad72
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 524108522
  • Loading branch information
caveness authored and tfx-copybara committed Apr 13, 2023
1 parent fb2ad72 commit cb3b5fe
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 60 deletions.
1 change: 0 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

* Jensen-Shannon divergence now treats NaN values as always contributing to
higher drift score.
* Moved some non-public arrow_util functions to TFX-BSL.

## Deprecations

Expand Down
188 changes: 181 additions & 7 deletions tensorflow_data_validation/arrow/arrow_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License
"""Util functions regarding to Arrow objects."""

from typing import Callable, Dict, Iterable, Optional, Text, Tuple
from typing import Callable, Dict, Iterable, Optional, Text, Tuple, Union

import numpy as np
import pyarrow as pa
Expand Down Expand Up @@ -82,6 +82,175 @@ def is_binary_like(data_type: pa.DataType) -> bool:
pa.types.is_large_unicode(data_type))


def is_list_like(data_type: pa.DataType) -> bool:
"""Returns true if an Arrow type is list-like."""
return pa.types.is_list(data_type) or pa.types.is_large_list(data_type)


def get_field(struct_array: pa.StructArray, field: Union[str, int]) -> pa.Array:
"""Returns struct_array.field(field) with null propagation.
This function is equivalent to struct_array.field() but correctly handles
null propagation (the parent struct's null values are propagated to children).
Args:
struct_array: A struct array which should be queried.
field: The request field to retrieve.
Returns:
A pa.Array containing the requested field.
Raises:
KeyError: If field is not a child field in struct_array.
"""
child_array = struct_array.field(field)

# In case all values are present then there's no need for special handling.
# We can return child_array as is to avoid a performance penalty caused by
# constructing and flattening the returned array.
if struct_array.null_count == 0:
return child_array
# is_valid returns a BooleanArray with two buffers the buffer at offset
# 0 is always None and buffer 1 contains the data on which fields are
# valid/not valid.
# (https://arrow.apache.org/docs/format/Columnar.html#buffer-listing-for-each-layout)
validity_bitmap_buffer = struct_array.is_valid().buffers()[1]

# Construct a new struct array with a single field. Calling flatten() on the
# new array guarantees validity bitmaps are merged correctly.
new_type = pa.struct([pa.field(field, child_array.type)])
if (child_array.null_count == 0 and child_array.offset != 0):
# TODO(caveness): Remove this special handling once flattening a struct that
# has children that were sliced produces arrays with a correct validity
# bitmap.
child_array = pa.concat_arrays([pa.nulls(0, child_array.type), child_array])
filtered_struct = pa.StructArray.from_buffers(
new_type,
len(struct_array), [validity_bitmap_buffer],
null_count=struct_array.null_count,
children=[child_array])
return filtered_struct.flatten()[0]


def get_array(
record_batch: pa.RecordBatch,
query_path: types.FeaturePath,
return_example_indices: bool,
wrap_flat_struct_in_list: bool = True,
) -> Tuple[pa.Array, Optional[np.ndarray]]:
"""Retrieve a nested array (and optionally example indices) from RecordBatch.
This function has the same assumption over `record_batch` as
`enumerate_arrays()` does.
If the provided path refers to a leaf in the `record_batch`, then a
"nested_list" will be returned. If the provided path does not refer to a leaf,
a "struct" will be returned.
See `enumerate_arrays()` for definition of "nested_list" and "struct".
Args:
record_batch: The RecordBatch whose arrays to be visited.
query_path: The FeaturePath to lookup in the record_batch.
return_example_indices: Whether to return an additional array containing the
example indices of the elements in the array corresponding to the
query_path.
wrap_flat_struct_in_list: if True, and if the query_path leads to a
struct<[Ts]> array, it will be wrapped in a list array, where each
sub-list contains one element. Caller can make use of this option to
assume this function always returns a list<inner_type>.
Returns:
A tuple. The first term is the feature array and the second term is the
example_indices array for the feature array (i.e. array[i] came from the
example at row example_indices[i] in the record_batch.).
Raises:
KeyError: When the query_path is empty, or cannot be found in the
record_batch and its nested struct arrays.
"""

def _recursion_helper(
query_path: types.FeaturePath, array: pa.Array,
example_indices: Optional[np.ndarray]
) -> Tuple[pa.Array, Optional[np.ndarray]]:
"""Recursion helper."""
array_type = array.type
if not query_path:
if pa.types.is_struct(array_type) and wrap_flat_struct_in_list:
array = array_util.ToSingletonListArray(array)
return array, example_indices
if not pa.types.is_struct(get_innermost_nested_type(array_type)):
raise KeyError('Cannot process query_path "{}" inside an array of type '
'{}. Expecting a struct<...> or '
'(large_)list...<struct<...>>.'.format(
query_path, array_type))
flat_struct_array, parent_indices = flatten_nested(
array, example_indices is not None)
flat_indices = None
if example_indices is not None:
flat_indices = example_indices[parent_indices]

step = query_path.steps()[0]

try:
child_array = get_field(flat_struct_array, step)
except KeyError:
raise KeyError(f'query_path step "{step}" not in struct.') # pylint: disable=raise-missing-from

relative_path = types.FeaturePath(query_path.steps()[1:])
return _recursion_helper(relative_path, child_array, flat_indices)

if not query_path:
raise KeyError('query_path must be non-empty.')
column_name = query_path.steps()[0]
field_index = record_batch.schema.get_field_index(column_name)
if field_index < 0:
raise KeyError('query_path step 0 "{}" not in record batch.'
.format(column_name))
array = record_batch.column(field_index)
array_path = types.FeaturePath(query_path.steps()[1:])

example_indices = np.arange(
record_batch.num_rows) if return_example_indices else None
return _recursion_helper(array_path, array, example_indices)


def flatten_nested(
array: pa.Array, return_parent_indices: bool = False
) -> Tuple[pa.Array, Optional[np.ndarray]]:
"""Flattens all the list arrays nesting an array.
If `array` is not list-like, itself will be returned.
Args:
array: pa.Array to flatten.
return_parent_indices: If True, also returns the parent indices array.
Returns:
A tuple. The first term is the flattened array. The second term is None
if `return_parent_indices` is False; otherwise it's a parent indices array
parallel to the flattened array: if parent_indices[i] = j, then
flattened_array[i] belongs to the j-th element of the input array.
"""
parent_indices = None

while is_list_like(array.type):
if return_parent_indices:
cur_parent_indices = array_util.GetFlattenedArrayParentIndices(
array).to_numpy()
if parent_indices is None:
parent_indices = cur_parent_indices
else:
parent_indices = parent_indices[cur_parent_indices]
array = array.flatten()

# the array is not nested at the first place.
if return_parent_indices and parent_indices is None:
parent_indices = np.arange(len(array))
return array, parent_indices


def enumerate_arrays(
record_batch: pa.RecordBatch,
example_weight_map: Optional[ExampleWeightMap],
Expand Down Expand Up @@ -151,7 +320,7 @@ def _recursion_helper(
) -> Iterable[Tuple[types.FeaturePath, pa.Array, Optional[np.ndarray]]]:
"""Recursion helper."""
array_type = array.type
innermost_nested_type = array_util.get_innermost_nested_type(array_type)
innermost_nested_type = get_innermost_nested_type(array_type)
if pa.types.is_struct(innermost_nested_type):
if not enumerate_leaves_only:
weights = all_weights.get(example_weight_map.get(feature_path))
Expand All @@ -162,7 +331,7 @@ def _recursion_helper(
if pa.types.is_struct(array_type) and wrap_flat_struct_in_list:
to_yield = array_util.ToSingletonListArray(array)
yield (feature_path, to_yield, weights)
flat_struct_array, parent_indices = array_util.flatten_nested(
flat_struct_array, parent_indices = flatten_nested(
array, bool(all_weights))
# Potential optimization:
# Only flatten weights that we know will be used in the recursion.
Expand All @@ -174,9 +343,7 @@ def _recursion_helper(
field_name = field.name
yield from _recursion_helper(
feature_path.child(field_name),
array_util.get_field(flat_struct_array, field_name),
flat_all_weights,
)
get_field(flat_struct_array, field_name), flat_all_weights)
else:
weights = all_weights.get(example_weight_map.get(feature_path))
yield (feature_path, array, weights)
Expand All @@ -197,6 +364,13 @@ def _recursion_helper(
types.FeaturePath([column_name]), column, all_weights)


def get_innermost_nested_type(arrow_type: pa.DataType) -> pa.DataType:
"""Returns the innermost type of a nested list type."""
while is_list_like(arrow_type):
arrow_type = arrow_type.value_type
return arrow_type


def get_nest_level(array_type: pa.DataType) -> int:
"""Returns the nest level of an array type.
Expand All @@ -212,7 +386,7 @@ def get_nest_level(array_type: pa.DataType) -> int:
the nest level.
"""
result = 0
while array_util.is_list_like(array_type):
while is_list_like(array_type):
result += 1
array_type = array_type.value_type

Expand Down
130 changes: 130 additions & 0 deletions tensorflow_data_validation/arrow/arrow_util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ def _Normalize(array: pa.Array) -> pa.Array:

class ArrowUtilTest(parameterized.TestCase):

def testIsListLike(self):
for t in (pa.list_(pa.int64()), pa.large_list(pa.int64())):
self.assertTrue(arrow_util.is_list_like(t))

for t in (pa.binary(), pa.int64(), pa.large_string()):
self.assertFalse(arrow_util.is_list_like(t))

def testIsBinaryLike(self):
for t in (pa.binary(), pa.large_binary(), pa.string(), pa.large_string()):
self.assertTrue(arrow_util.is_binary_like(t))
Expand Down Expand Up @@ -333,6 +340,108 @@ def testGetWeightFeatureTooManyValues(self):
pa.array([[1], [2, 2]])], ["v", "w"]),
weight_column="w")

def testGetArrayEmptyPath(self):
with self.assertRaisesRegex(
KeyError,
r"query_path must be non-empty.*"):
arrow_util.get_array(
pa.RecordBatch.from_arrays([pa.array([[1], [2, 3]])], ["v"]),
query_path=types.FeaturePath([]),
return_example_indices=False)

def testGetArrayColumnMissing(self):
with self.assertRaisesRegex(
KeyError,
r'query_path step 0 "x" not in record batch.*'):
arrow_util.get_array(
pa.RecordBatch.from_arrays([pa.array([[1], [2]])], ["y"]),
query_path=types.FeaturePath(["x"]),
return_example_indices=False)

def testGetArrayStepMissing(self):
with self.assertRaisesRegex(KeyError,
r'query_path step "ssf3" not in struct.*'):
arrow_util.get_array(
_INPUT_RECORD_BATCH,
query_path=types.FeaturePath(["f2", "sf2", "ssf3"]),
return_example_indices=False)

def testGetArrayReturnExampleIndices(self):
record_batch = pa.RecordBatch.from_arrays([
pa.array([[{
"sf": [{
"ssf": [1]
}, {
"ssf": [2]
}]
}], [{
"sf": [{
"ssf": [3, 4]
}]
}]]),
pa.array([["one"], ["two"]])
], ["f", "w"])
feature = types.FeaturePath(["f", "sf", "ssf"])
actual_arr, actual_indices = arrow_util.get_array(
record_batch, feature, return_example_indices=True)
expected_arr = pa.array([[1], [2], [3, 4]])
expected_indices = np.array([0, 0, 1])
self.assertTrue(
actual_arr.equals(expected_arr),
"\nfeature: {};\nexpected:\n{};\nactual:\n{}".format(
feature, expected_arr, actual_arr))
np.testing.assert_array_equal(expected_indices, actual_indices)

def testGetArraySubpathMissing(self):
with self.assertRaisesRegex(
KeyError,
r'Cannot process .* "sssf" inside .* list<item: int64>.*'):
arrow_util.get_array(
_INPUT_RECORD_BATCH,
query_path=types.FeaturePath(["f2", "sf2", "ssf1", "sssf"]),
return_example_indices=False)

@parameterized.named_parameters(
((str(f), f, expected) for (f, expected) in _FEATURES_TO_ARRAYS.items()))
def testGetArray(self, feature, expected):
actual_arr, actual_indices = arrow_util.get_array(
_INPUT_RECORD_BATCH, feature, return_example_indices=True,
wrap_flat_struct_in_list=False)
expected_arr, expected_indices, _ = expected
self.assertTrue(
actual_arr.equals(expected_arr),
"\nfeature: {};\nexpected:\n{};\nactual:\n{}".format(
feature, expected_arr, actual_arr))
np.testing.assert_array_equal(expected_indices, actual_indices)

@parameterized.named_parameters(
((str(f), f, expected) for (f, expected) in _FEATURES_TO_ARRAYS.items()))
def testGetArrayNoBroadcast(self, feature, expected):
actual_arr, actual_indices = arrow_util.get_array(
_INPUT_RECORD_BATCH, feature, return_example_indices=False,
wrap_flat_struct_in_list=False)
expected_arr, _, _ = expected
self.assertTrue(
actual_arr.equals(expected_arr),
"\nfeature: {};\nexpected:\n{};\nactual:\n{}".format(
feature, expected_arr, actual_arr))
self.assertIsNone(actual_indices)

@parameterized.named_parameters(
((str(f), f, expected) for (f, expected) in _FEATURES_TO_ARRAYS.items()))
def testGetArrayWrapFlatStructArray(self, feature, expected):
actual_arr, actual_indices = arrow_util.get_array(
_INPUT_RECORD_BATCH, feature, return_example_indices=True,
wrap_flat_struct_in_list=True)
expected_arr, expected_indices, _ = expected
if pa.types.is_struct(expected_arr.type):
expected_arr = array_util.ToSingletonListArray(expected_arr)
self.assertTrue(
actual_arr.equals(expected_arr),
"\nfeature: {};\nexpected:\n{};\nactual:\n{}".format(
feature, expected_arr, actual_arr))
np.testing.assert_array_equal(expected_indices, actual_indices)

def testEnumerateArraysStringWeight(self):
# The arrow type of a string changes between py2 and py3 so we accept either
with self.assertRaisesRegex(
Expand Down Expand Up @@ -442,6 +551,27 @@ def testEnumerateMissingPropagatedInFlattenedStruct(self, batch,
"feature={}; expected: {}; actual: {}; diff: {}".format(
k, v, actual, actual.diff(v)))

def testFlattenNested(self):
input_array = pa.array([[[1, 2]], None, [None, [3]]])
flattened, parent_indices = arrow_util.flatten_nested(
input_array, return_parent_indices=False)
expected = pa.array([1, 2, 3])
expected_parent_indices = [0, 0, 2]
self.assertIs(parent_indices, None)
self.assertTrue(flattened.equals(expected))

flattened, parent_indices = arrow_util.flatten_nested(
input_array, return_parent_indices=True)
self.assertTrue(flattened.equals(expected))
np.testing.assert_array_equal(parent_indices, expected_parent_indices)

def testFlattenNestedNonList(self):
input_array = pa.array([1, 2])
flattened, parent_indices = arrow_util.flatten_nested(
input_array, return_parent_indices=True)
self.assertTrue(flattened.equals(pa.array([1, 2])))
np.testing.assert_array_equal(parent_indices, [0, 1])

def testGetColumn(self):
self.assertTrue(
arrow_util.get_column(_INPUT_RECORD_BATCH,
Expand Down
Loading

0 comments on commit cb3b5fe

Please sign in to comment.