diff --git a/google/cloud/bigquery/_helpers.py b/google/cloud/bigquery/_helpers.py index 77054542a..bf0f80e22 100644 --- a/google/cloud/bigquery/_helpers.py +++ b/google/cloud/bigquery/_helpers.py @@ -26,7 +26,7 @@ from google.cloud._helpers import _RFC3339_MICROS from google.cloud._helpers import _RFC3339_NO_FRACTION from google.cloud._helpers import _to_bytes -import pkg_resources +import packaging.version from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError @@ -41,31 +41,65 @@ re.VERBOSE, ) -_MIN_BQ_STORAGE_VERSION = pkg_resources.parse_version("2.0.0") +_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0") +_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0") -def _verify_bq_storage_version(): - """Verify that a recent enough version of BigQuery Storage extra is installed. +class BQStorageVersions: + """Version comparisons for google-cloud-bigqueyr-storage package.""" - The function assumes that google-cloud-bigquery-storage extra is installed, and - should thus be used in places where this assumption holds. + def __init__(self): + self._installed_version = None - Because `pip` can install an outdated version of this extra despite the constraints - in setup.py, the the calling code can use this helper to verify the version - compatibility at runtime. - """ - from google.cloud import bigquery_storage + @property + def installed_version(self) -> packaging.version.Version: + """Return the parsed version of google-cloud-bigquery-storage.""" + if self._installed_version is None: + from google.cloud import bigquery_storage - installed_version = pkg_resources.parse_version( - getattr(bigquery_storage, "__version__", "legacy") - ) + self._installed_version = packaging.version.parse( + # Use 0.0.0, since it is earlier than any released version. + # Legacy versions also have the same property, but + # creating a LegacyVersion has been deprecated. + # https://github.com/pypa/packaging/issues/321 + getattr(bigquery_storage, "__version__", "0.0.0") + ) - if installed_version < _MIN_BQ_STORAGE_VERSION: - msg = ( - "Dependency google-cloud-bigquery-storage is outdated, please upgrade " - f"it to version >= 2.0.0 (version found: {installed_version})." - ) - raise LegacyBigQueryStorageError(msg) + return self._installed_version + + @property + def is_read_session_optional(self) -> bool: + """True if read_session is optional to rows(). + + See: https://github.com/googleapis/python-bigquery-storage/pull/228 + """ + return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION + + def verify_version(self): + """Verify that a recent enough version of BigQuery Storage extra is + installed. + + The function assumes that google-cloud-bigquery-storage extra is + installed, and should thus be used in places where this assumption + holds. + + Because `pip` can install an outdated version of this extra despite the + constraints in `setup.py`, the calling code can use this helper to + verify the version compatibility at runtime. + + Raises: + LegacyBigQueryStorageError: + If the google-cloud-bigquery-storage package is outdated. + """ + if self.installed_version < _MIN_BQ_STORAGE_VERSION: + msg = ( + "Dependency google-cloud-bigquery-storage is outdated, please upgrade " + f"it to version >= 2.0.0 (version found: {self.installed_version})." + ) + raise LegacyBigQueryStorageError(msg) + + +BQ_STORAGE_VERSIONS = BQStorageVersions() def _not_null(value, field): diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 285c0e83c..2ff96da4d 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -41,6 +41,7 @@ # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. _ARROW_COMPRESSION_SUPPORT = True +from google.cloud.bigquery import _helpers from google.cloud.bigquery import schema @@ -590,7 +591,14 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page): def _download_table_bqstorage_stream( download_state, bqstorage_client, session, stream, worker_queue, page_to_item ): - rowstream = bqstorage_client.read_rows(stream.name).rows(session) + reader = bqstorage_client.read_rows(stream.name) + + # Avoid deprecation warnings for passing in unnecessary read session. + # https://github.com/googleapis/python-bigquery-storage/issues/229 + if _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional: + rowstream = reader.rows() + else: + rowstream = reader.rows(session) for page in rowstream.pages: if download_state.done: diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index de259abce..8572ba911 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -61,7 +61,7 @@ from google.cloud.bigquery._helpers import _get_sub_prop from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none -from google.cloud.bigquery._helpers import _verify_bq_storage_version +from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS from google.cloud.bigquery._helpers import _verify_job_config_type from google.cloud.bigquery._http import Connection from google.cloud.bigquery import _pandas_helpers @@ -508,7 +508,7 @@ def _ensure_bqstorage_client( return None try: - _verify_bq_storage_version() + BQ_STORAGE_VERSIONS.verify_version() except LegacyBigQueryStorageError as exc: warnings.warn(str(exc)) return None diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 765110ae6..2d9c15f50 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1565,7 +1565,7 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client): return False try: - _helpers._verify_bq_storage_version() + _helpers.BQ_STORAGE_VERSIONS.verify_version() except LegacyBigQueryStorageError as exc: warnings.warn(str(exc)) return False diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index c62947d37..af026ccbe 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -26,11 +26,17 @@ @unittest.skipIf(bigquery_storage is None, "Requires `google-cloud-bigquery-storage`") -class Test_verify_bq_storage_version(unittest.TestCase): +class TestBQStorageVersions(unittest.TestCase): + def _object_under_test(self): + from google.cloud.bigquery import _helpers + + return _helpers.BQStorageVersions() + def _call_fut(self): - from google.cloud.bigquery._helpers import _verify_bq_storage_version + from google.cloud.bigquery import _helpers - return _verify_bq_storage_version() + _helpers.BQ_STORAGE_VERSIONS._installed_version = None + return _helpers.BQ_STORAGE_VERSIONS.verify_version() def test_raises_no_error_w_recent_bqstorage(self): from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError @@ -53,10 +59,35 @@ def test_raises_error_w_unknown_bqstorage_version(self): with mock.patch("google.cloud.bigquery_storage", autospec=True) as fake_module: del fake_module.__version__ - error_pattern = r"version found: legacy" + error_pattern = r"version found: 0.0.0" with self.assertRaisesRegex(LegacyBigQueryStorageError, error_pattern): self._call_fut() + def test_installed_version_returns_cached(self): + versions = self._object_under_test() + versions._installed_version = object() + assert versions.installed_version is versions._installed_version + + def test_installed_version_returns_parsed_version(self): + versions = self._object_under_test() + + with mock.patch("google.cloud.bigquery_storage.__version__", new="1.2.3"): + version = versions.installed_version + + assert version.major == 1 + assert version.minor == 2 + assert version.micro == 3 + + def test_is_read_session_optional_true(self): + versions = self._object_under_test() + with mock.patch("google.cloud.bigquery_storage.__version__", new="2.6.0"): + assert versions.is_read_session_optional + + def test_is_read_session_optional_false(self): + versions = self._object_under_test() + with mock.patch("google.cloud.bigquery_storage.__version__", new="2.5.0"): + assert not versions.is_read_session_optional + class Test_not_null(unittest.TestCase): def _call_fut(self, value, field): diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index aa87e28f5..0ba671cd9 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -40,11 +40,14 @@ import pytz from google import api_core +from google.cloud.bigquery import _helpers from google.cloud.bigquery import schema from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT try: from google.cloud import bigquery_storage + + _helpers.BQ_STORAGE_VERSIONS.verify_version() except ImportError: # pragma: NO COVER bigquery_storage = None @@ -1311,6 +1314,72 @@ def test_dataframe_to_parquet_dict_sequence_schema(module_under_test): assert schema_arg == expected_schema_arg +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test__download_table_bqstorage_stream_includes_read_session( + monkeypatch, module_under_test +): + import google.cloud.bigquery_storage_v1.reader + import google.cloud.bigquery_storage_v1.types + + monkeypatch.setattr(_helpers.BQ_STORAGE_VERSIONS, "_installed_version", None) + monkeypatch.setattr(bigquery_storage, "__version__", "2.5.0") + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + reader = mock.create_autospec( + google.cloud.bigquery_storage_v1.reader.ReadRowsStream, instance=True + ) + bqstorage_client.read_rows.return_value = reader + session = google.cloud.bigquery_storage_v1.types.ReadSession() + + module_under_test._download_table_bqstorage_stream( + module_under_test._DownloadState(), + bqstorage_client, + session, + google.cloud.bigquery_storage_v1.types.ReadStream(name="test"), + queue.Queue(), + mock.Mock(), + ) + + reader.rows.assert_called_once_with(session) + + +@pytest.mark.skipif( + bigquery_storage is None + or not _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional, + reason="Requires `google-cloud-bigquery-storage` >= 2.6.0", +) +def test__download_table_bqstorage_stream_omits_read_session( + monkeypatch, module_under_test +): + import google.cloud.bigquery_storage_v1.reader + import google.cloud.bigquery_storage_v1.types + + monkeypatch.setattr(_helpers.BQ_STORAGE_VERSIONS, "_installed_version", None) + monkeypatch.setattr(bigquery_storage, "__version__", "2.6.0") + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + reader = mock.create_autospec( + google.cloud.bigquery_storage_v1.reader.ReadRowsStream, instance=True + ) + bqstorage_client.read_rows.return_value = reader + session = google.cloud.bigquery_storage_v1.types.ReadSession() + + module_under_test._download_table_bqstorage_stream( + module_under_test._DownloadState(), + bqstorage_client, + session, + google.cloud.bigquery_storage_v1.types.ReadStream(name="test"), + queue.Queue(), + mock.Mock(), + ) + + reader.rows.assert_called_once_with() + + @pytest.mark.parametrize( "stream_count,maxsize_kwarg,expected_call_count,expected_maxsize", [ diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 2be8daab6..6b62eb85b 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -663,7 +663,7 @@ def test_ensure_bqstorage_client_obsolete_dependency(self): client = self._make_one(project=self.PROJECT, credentials=creds) patcher = mock.patch( - "google.cloud.bigquery.client._verify_bq_storage_version", + "google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version", side_effect=LegacyBigQueryStorageError("BQ Storage too old"), ) with patcher, warnings.catch_warnings(record=True) as warned: @@ -700,7 +700,7 @@ def test_ensure_bqstorage_client_existing_client_check_fails(self): mock_storage_client = mock.sentinel.mock_storage_client patcher = mock.patch( - "google.cloud.bigquery.client._verify_bq_storage_version", + "google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version", side_effect=LegacyBigQueryStorageError("BQ Storage too old"), ) with patcher, warnings.catch_warnings(record=True) as warned: diff --git a/tests/unit/test_magics.py b/tests/unit/test_magics.py index 5e9bf28a9..d030482cc 100644 --- a/tests/unit/test_magics.py +++ b/tests/unit/test_magics.py @@ -368,7 +368,7 @@ def test__make_bqstorage_client_true_obsolete_dependency(): ) patcher = mock.patch( - "google.cloud.bigquery.client._verify_bq_storage_version", + "google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version", side_effect=LegacyBigQueryStorageError("BQ Storage too old"), ) with patcher, warnings.catch_warnings(record=True) as warned: diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index b30f16fe0..37650cd27 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -1889,7 +1889,7 @@ def test__validate_bqstorage_returns_false_w_warning_if_obsolete_version(self): iterator = self._make_one(first_page_response=None) # not cached patcher = mock.patch( - "google.cloud.bigquery.table._helpers._verify_bq_storage_version", + "google.cloud.bigquery.table._helpers.BQ_STORAGE_VERSIONS.verify_version", side_effect=LegacyBigQueryStorageError("BQ Storage too old"), ) with patcher, warnings.catch_warnings(record=True) as warned: