Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: omit read_session with latest google-cloud-bigquery-storage #748

Merged
merged 13 commits into from
Jul 16, 2021
60 changes: 40 additions & 20 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _to_bytes
import pkg_resources
import packaging.version
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to add type annotations, but it turns out pkg_resources is just a thin wrapper around packaging.

https://github.com/pypa/setuptools/blob/a4dbe3457d89cf67ee3aa571fdb149e6eb544e88/pkg_resources/__init__.py/#L112


from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

Expand All @@ -41,31 +41,51 @@
re.VERBOSE,
)

_MIN_BQ_STORAGE_VERSION = pkg_resources.parse_version("2.0.0")
_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")
_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")


def _verify_bq_storage_version():
"""Verify that a recent enough version of BigQuery Storage extra is installed.
class BQStorageVersions:
def __init__(self):
self._installed_version = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version checker, sir, is classy (pun intended). Well done!


The function assumes that google-cloud-bigquery-storage extra is installed, and
should thus be used in places where this assumption holds.
@property
def installed_version(self) -> packaging.version.Version:
if self._installed_version is None:
from google.cloud import bigquery_storage

Because `pip` can install an outdated version of this extra despite the constraints
in setup.py, the the calling code can use this helper to verify the version
compatibility at runtime.
"""
from google.cloud import bigquery_storage
self._installed_version = packaging.version.Version(
getattr(bigquery_storage, "__version__", "legacy")
)

installed_version = pkg_resources.parse_version(
getattr(bigquery_storage, "__version__", "legacy")
)
return self._installed_version

if installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {installed_version})."
)
raise LegacyBigQueryStorageError(msg)
@property
def is_read_session_optional(self) -> bool:
return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION

def verify_version(self):
"""Verify that a recent enough version of BigQuery Storage extra is installed.

The function assumes that google-cloud-bigquery-storage extra is installed, and
should thus be used in places where this assumption holds.

Because `pip` can install an outdated version of this extra despite the constraints
in setup.py, the the calling code can use this helper to verify the version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I now noticed you copied my typo, let's remove it.

Suggested change
in setup.py, the the calling code can use this helper to verify the version
in `setup.py`, the calling code can use this helper to verify the version

compatibility at runtime.

Raises:
LegacyBigQueryStorageError: If google-cloud-bigquery-storage is outdated.
"""
if self.installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {self.installed_version})."
)
raise LegacyBigQueryStorageError(msg)


BQ_STORAGE_VERSIONS = BQStorageVersions()


def _not_null(value, field):
Expand Down
10 changes: 9 additions & 1 deletion google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import _helpers
from google.cloud.bigquery import schema


Expand Down Expand Up @@ -590,7 +591,14 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
def _download_table_bqstorage_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
rowstream = bqstorage_client.read_rows(stream.name).rows(session)
reader = bqstorage_client.read_rows(stream.name)

# Avoid deprecation warnings for passing in unnecessary read session.
# https://github.com/googleapis/python-bigquery-storage/issues/229
if _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
rowstream = reader.rows()
else:
rowstream = reader.rows(session)

for page in rowstream.pages:
if download_state.done:
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
Expand Down Expand Up @@ -508,7 +508,7 @@ def _ensure_bqstorage_client(
return None

try:
_verify_bq_storage_version()
BQ_STORAGE_VERSIONS.verify_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
return False

try:
_helpers._verify_bq_storage_version()
_helpers.BQ_STORAGE_VERSIONS.verify_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return False
Expand Down
7 changes: 4 additions & 3 deletions tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@


@unittest.skipIf(bigquery_storage is None, "Requires `google-cloud-bigquery-storage`")
class Test_verify_bq_storage_version(unittest.TestCase):
class TestBQStorageVersions(unittest.TestCase):
def _call_fut(self):
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery import _helpers

return _verify_bq_storage_version()
_helpers.BQ_STORAGE_VERSIONS._installed_version = None
return _helpers.BQ_STORAGE_VERSIONS.verify_version()

def test_raises_no_error_w_recent_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
Expand Down
71 changes: 71 additions & 0 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@
import pytz

from google import api_core
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import schema
from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT

try:
from google.cloud import bigquery_storage

_helpers.BQ_STORAGE_VERSIONS.verify_version()
except ImportError: # pragma: NO COVER
bigquery_storage = None

Expand Down Expand Up @@ -1311,6 +1314,74 @@ def test_dataframe_to_parquet_dict_sequence_schema(module_under_test):
assert schema_arg == expected_schema_arg


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test__download_table_bqstorage_stream_includes_read_session(
monkeypatch, module_under_test
):
import google.cloud.bigquery_storage_v1.reader
import google.cloud.bigquery_storage_v1.types

monkeypatch.setattr(_helpers.BQ_STORAGE_VERSIONS, "_installed_version", None)
monkeypatch.setattr(bigquery_storage, "__version__", "2.5.0")
bqstorage_client = mock.create_autospec(
bigquery_storage.BigQueryReadClient, instance=True
)
reader = mock.create_autospec(
google.cloud.bigquery_storage_v1.reader.ReadRowsStream, instance=True
)
bqstorage_client.read_rows.return_value = reader
session = google.cloud.bigquery_storage_v1.types.ReadSession()

module_under_test._download_table_bqstorage_stream(
module_under_test._DownloadState(),
bqstorage_client,
session,
google.cloud.bigquery_storage_v1.types.ReadStream(name="test"),
queue.Queue(),
lambda item: item,
)

reader.rows.assert_called_once_with(session)


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
@pytest.mark.skipif(
not _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional,
reason="read_session is required",
)
def test__download_table_bqstorage_stream_omits_read_session(
monkeypatch, module_under_test
):
import google.cloud.bigquery_storage_v1.reader
import google.cloud.bigquery_storage_v1.types

monkeypatch.setattr(_helpers.BQ_STORAGE_VERSIONS, "_installed_version", None)
monkeypatch.setattr(bigquery_storage, "__version__", "2.6.0")
bqstorage_client = mock.create_autospec(
bigquery_storage.BigQueryReadClient, instance=True
)
reader = mock.create_autospec(
google.cloud.bigquery_storage_v1.reader.ReadRowsStream, instance=True
)
bqstorage_client.read_rows.return_value = reader
session = google.cloud.bigquery_storage_v1.types.ReadSession()

module_under_test._download_table_bqstorage_stream(
module_under_test._DownloadState(),
bqstorage_client,
session,
google.cloud.bigquery_storage_v1.types.ReadStream(name="test"),
queue.Queue(),
lambda item: item,
)

reader.rows.assert_called_once_with()


@pytest.mark.parametrize(
"stream_count,maxsize_kwarg,expected_call_count,expected_maxsize",
[
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ def test_ensure_bqstorage_client_obsolete_dependency(self):
client = self._make_one(project=self.PROJECT, credentials=creds)

patcher = mock.patch(
"google.cloud.bigquery.client._verify_bq_storage_version",
"google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down Expand Up @@ -700,7 +700,7 @@ def test_ensure_bqstorage_client_existing_client_check_fails(self):
mock_storage_client = mock.sentinel.mock_storage_client

patcher = mock.patch(
"google.cloud.bigquery.client._verify_bq_storage_version",
"google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def test__make_bqstorage_client_true_obsolete_dependency():
)

patcher = mock.patch(
"google.cloud.bigquery.client._verify_bq_storage_version",
"google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1889,7 +1889,7 @@ def test__validate_bqstorage_returns_false_w_warning_if_obsolete_version(self):
iterator = self._make_one(first_page_response=None) # not cached

patcher = mock.patch(
"google.cloud.bigquery.table._helpers._verify_bq_storage_version",
"google.cloud.bigquery.table._helpers.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down