[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: consolidate PyarrowVersions helpers #1679

Merged
merged 16 commits into from
Oct 18, 2023
Prev Previous commit
Next Next commit
complete refactor
  • Loading branch information
Linchin committed Oct 9, 2023
commit cc9918ddb4b2f9e4a0701e07ae23b0f77029e0b6
156 changes: 0 additions & 156 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@

_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")

_MIN_PYARROW_VERSION = packaging.version.Version("3.0.0")

_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")

BIGQUERY_EMULATOR_HOST = "BIGQUERY_EMULATOR_HOST"
Expand Down Expand Up @@ -127,161 +125,7 @@ def verify_version(self):
raise LegacyBigQueryStorageError(msg)


def pyarrow_datetime():
return pyarrow.timestamp("us", tz=None)


def pyarrow_numeric():
return pyarrow.decimal128(38, 9)


def pyarrow_bignumeric():
# 77th digit is partial.
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
return pyarrow.decimal256(76, 38)


def pyarrow_time():
return pyarrow.time64("us")


def pyarrow_timestamp():
return pyarrow.timestamp("us", tz="UTC")


class PyarrowVersions:
"""Version comparisons for pyarrow package."""

def __init__(self):
self._installed_version = None

@property
def installed_version(self) -> packaging.version.Version:
"""Return the parsed version of pyarrow."""
if self._installed_version is None:
import pyarrow # type: ignore

self._installed_version = packaging.version.parse(
# Use 0.0.0, since it is earlier than any released version.
# Legacy versions also have the same property, but
# creating a LegacyVersion has been deprecated.
# https://github.com/pypa/packaging/issues/321
getattr(pyarrow, "__version__", "0.0.0")
)

return self._installed_version

@property
def bq_to_arrow_scalars(self) -> dict[str]:
"""
Returns:
Dict[str, Any]:
A dictionary of the mapping from BigQuery scalar types to Arrow
scalar types.
"""
return copy.deepcopy(self._BQ_TO_ARROW_SCALARS)

@property
def arrow_scalar_ids_to_bq(self) -> dict:
"""
Returns:
Dict[Any, str]:
A dictionary of the mapping from Arrow scalar types to BigQuery
scalar types.
"""
return copy.deepcopy(self._ARROW_SCALAR_IDS_TO_BQ)

@property
def use_compliant_nested_type(self) -> bool:
return self.installed_version.major >= 4

def try_import(self, raise_if_error: bool = False) -> Any:
"""Verify that a recent enough version of pyarrow extra is
installed.

The function assumes that pyarrow extra is installed, and should thus
be used in places where this assumption holds.

Because `pip` can install an outdated version of this extra despite the
constraints in `setup.py`, the calling code can use this helper to
verify the version compatibility at runtime.

Returns:
The ``pyarrow`` module or ``None``.

Raises:
LegacyPyarrowError:
If the pyarrow package is outdated and ``raise_if_error`` is ``True``.
"""
try:
import pyarrow
except ImportError as exc: # pragma: NO COVER
if raise_if_error:
raise LegacyPyarrowError(
f"pyarrow package not found. Install pyarrow version >= {_MIN_PYARROW_VERSION}."
) from exc
return None

if self.installed_version < _MIN_PYARROW_VERSION:
if raise_if_error:
msg = (
"Dependency pyarrow is outdated, please upgrade "
f"it to version >= {_MIN_PYARROW_VERSION} (version found: {self.installed_version})."
)
raise LegacyPyarrowError(msg)
return None

# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
# Note(todo!!): type "BIGNUMERIC"'s matching pyarrow type is added in _pandas_helpers.py
self._BQ_TO_ARROW_SCALARS = {
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
"BYTES": pyarrow.binary,
"DATE": pyarrow.date32,
"DATETIME": pyarrow_datetime,
"FLOAT": pyarrow.float64,
"FLOAT64": pyarrow.float64,
"GEOGRAPHY": pyarrow.string,
"INT64": pyarrow.int64,
"INTEGER": pyarrow.int64,
"NUMERIC": pyarrow_numeric,
"STRING": pyarrow.string,
"TIME": pyarrow_time,
"TIMESTAMP": pyarrow_timestamp,
"BIGNUMERIC": pyarrow_bignumeric,
}
self._ARROW_SCALAR_IDS_TO_BQ = {
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
pyarrow.bool_().id: "BOOL",
pyarrow.int8().id: "INT64",
pyarrow.int16().id: "INT64",
pyarrow.int32().id: "INT64",
pyarrow.int64().id: "INT64",
pyarrow.uint8().id: "INT64",
pyarrow.uint16().id: "INT64",
pyarrow.uint32().id: "INT64",
pyarrow.uint64().id: "INT64",
pyarrow.float16().id: "FLOAT64",
pyarrow.float32().id: "FLOAT64",
pyarrow.float64().id: "FLOAT64",
pyarrow.time32("ms").id: "TIME",
pyarrow.time64("ns").id: "TIME",
pyarrow.timestamp("ns").id: "TIMESTAMP",
pyarrow.date32().id: "DATE",
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
pyarrow.decimal256(76, scale=38).id: "BIGNUMERIC",
}

return pyarrow


BQ_STORAGE_VERSIONS = BQStorageVersions()
PYARROW_VERSIONS = PyarrowVersions()


def _not_null(value, field):
Expand Down
44 changes: 11 additions & 33 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from packaging import version

from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import schema

try:
Expand All @@ -49,7 +50,11 @@
db_dtypes_import_exception = exc
date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype

pyarrow = _helpers.PYARROW_VERSIONS.try_import()
pyarrow = _pyarrow_helpers.PYARROW_VERSIONS.try_import()

_BIGNUMERIC_SUPPORT = False
if pyarrow is not None:
_BIGNUMERIC_SUPPORT = True

try:
# _BaseGeometry is used to detect shapely objevys in `bq_to_arrow_array`
Expand Down Expand Up @@ -118,33 +123,6 @@ def __init__(self):
# the global interpreter lock).
self.done = False


### remove
# if pyarrow:
# if version.parse(pyarrow.__version__) >= version.parse("3.0.0"):
# BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric
# # The exact decimal's scale and precision are not important, as only
# # the type ID matters, and it's the same for all decimal256 instances.
# ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC"
# _BIGNUMERIC_SUPPORT = True
# else:
# _BIGNUMERIC_SUPPORT = False # pragma: NO COVER

# else: # pragma: NO COVER
# BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
# ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
# _BIGNUMERIC_SUPPORT = False # pragma: NO COVER
## remove

BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
_BIGNUMERIC_SUPPORT = False # pragma: NO COVER

if pyarrow:
BQ_TO_ARROW_SCALARS = _helpers.PYARROW_VERSIONS.bq_to_arrow_scalars
ARROW_SCALAR_IDS_TO_BQ = _helpers.PYARROW_VERSIONS.arrow_scalar_ids_to_bq


BQ_FIELD_TYPE_TO_ARROW_FIELD_METADATA = {
"GEOGRAPHY": {
b"ARROW:extension:name": b"google:sqlType:geography",
Expand Down Expand Up @@ -185,7 +163,7 @@ def bq_to_arrow_data_type(field):
if field_type_upper in schema._STRUCT_TYPES:
Linchin marked this conversation as resolved.
Show resolved Hide resolved
return bq_to_arrow_struct_data_type(field)

data_type_constructor = BQ_TO_ARROW_SCALARS.get(field_type_upper)
data_type_constructor = _pyarrow_helpers.PYARROW_VERSIONS.bq_to_arrow_scalars(field_type_upper)
if data_type_constructor is None:
return None
return data_type_constructor()
Expand Down Expand Up @@ -513,7 +491,7 @@ def augment_schema(dataframe, current_bq_schema):
if pyarrow.types.is_list(arrow_table.type):
# `pyarrow.ListType`
detected_mode = "REPEATED"
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id)
detected_type = _pyarrow_helpers.PYARROW_VERSIONS.arrow_scalar_ids_to_bq(arrow_table.values.type.id)

# For timezone-naive datetimes, pyarrow assumes the UTC timezone and adds
# it to such datetimes, causing them to be recognized as TIMESTAMP type.
Expand All @@ -529,7 +507,7 @@ def augment_schema(dataframe, current_bq_schema):
detected_type = "DATETIME"
else:
detected_mode = field.mode
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)
detected_type = _pyarrow_helpers.PYARROW_VERSIONS.arrow_scalar_ids_to_bq(arrow_table.type.id)

if detected_type is None:
unknown_type_fields.append(field)
Expand Down Expand Up @@ -650,13 +628,13 @@ def dataframe_to_parquet(

This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
"""
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
pyarrow = _pyarrow_helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)

import pyarrow.parquet # type: ignore

kwargs = (
{"use_compliant_nested_type": parquet_use_compliant_nested_type}
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
if _pyarrow_helpers.PYARROW_VERSIONS.use_compliant_nested_type
else {}
)

Expand Down
Loading