From 9b40875b9b8a0be381f792e2209f48decc2d88e9 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 14 Oct 2024 22:13:50 -0700 Subject: [PATCH 01/15] clean up cloud functionality --- airbyte/_util/api_util.py | 190 ++++++++++++++---- airbyte/_util/text_util.py | 22 +++ airbyte/caches/base.py | 4 - airbyte/cloud/connections.py | 49 +++-- airbyte/cloud/connectors.py | 81 ++++++++ airbyte/cloud/experimental.py | 60 ------ airbyte/cloud/workspaces.py | 355 ++++++++++++++++++++-------------- airbyte/exceptions.py | 9 + airbyte/sources/base.py | 6 +- 9 files changed, 499 insertions(+), 277 deletions(-) create mode 100644 airbyte/_util/text_util.py create mode 100644 airbyte/cloud/connectors.py delete mode 100644 airbyte/cloud/experimental.py diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 4bdc0b11..403ff659 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -14,7 +14,7 @@ from __future__ import annotations import json -from typing import Any +from typing import TYPE_CHECKING, Any import airbyte_api from airbyte_api import api, models @@ -24,9 +24,14 @@ AirbyteError, AirbyteMissingResourceError, AirbyteMultipleResourcesError, + PyAirbyteInputError, ) +if TYPE_CHECKING: + from collections.abc import Callable + + JOB_WAIT_INTERVAL_SECS = 2.0 JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour CLOUD_API_ROOT = "https://api.airbyte.com/v1" @@ -43,7 +48,7 @@ def get_airbyte_server_instance( *, api_key: str, api_root: str, -) -> airbyte_api.Airbyte: +) -> airbyte_api.AirbyteAPI: """Get an Airbyte instance.""" return airbyte_api.AirbyteAPI( security=models.Security( @@ -84,7 +89,7 @@ def get_workspace( ) -# List, get, and run connections +# List resources def list_connections( @@ -92,28 +97,118 @@ def list_connections( *, api_root: str, api_key: str, -) -> list[api.ConnectionResponse]: + name: str | None = None, + name_filter: Callable[[str], bool] | None = None, +) -> list[models.ConnectionResponse]: """Get a connection.""" + if name and name_filter: + raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.") + + name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) + _ = workspace_id # Not used (yet) airbyte_instance = get_airbyte_server_instance( api_key=api_key, api_root=api_root, ) response = airbyte_instance.connections.list_connections( - api.ListConnectionsRequest()( + api.ListConnectionsRequest( workspace_ids=[workspace_id], ), ) - if status_ok(response.status_code) and response.connections_response: - return response.connections_response.data + if not status_ok(response.status_code) and response.connections_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.connections_response is not None + return [ + connection + for connection in response.connections_response.data + if name_filter(connection.name) + ] + - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } +def list_sources( + workspace_id: str, + *, + api_root: str, + api_key: str, + name: str | None = None, + name_filter: Callable[[str], bool] | None = None, +) -> list[models.SourceResponse]: + """Get a connection.""" + if name and name_filter: + raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.") + + name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) + + _ = workspace_id # Not used (yet) + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + response = airbyte_instance.sources.list_sources( + api.ListSourcesRequest( + workspace_ids=[workspace_id], + ), + ) + + if not status_ok(response.status_code) and response.sources_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.sources_response is not None + return [source for source in response.sources_response.data if name_filter(source.name)] + + +def list_destinations( + workspace_id: str, + *, + api_root: str, + api_key: str, + name: str | None = None, + name_filter: Callable[[str], bool] | None = None, +) -> list[models.DestinationResponse]: + """Get a connection.""" + if name and name_filter: + raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.") + + name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) + + _ = workspace_id # Not used (yet) + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, ) + response = airbyte_instance.destinations.list_destinations( + api.ListDestinationsRequest( + workspace_ids=[workspace_id], + ), + ) + + if not status_ok(response.status_code) and response.destinations_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.destinations_response is not None + return [ + destination + for destination in response.destinations_response.data + if name_filter(destination.name) + ] + + +# Get and run connections def get_connection( @@ -122,7 +217,7 @@ def get_connection( *, api_root: str, api_key: str, -) -> api.ConnectionResponse: +) -> models.ConnectionResponse: """Get a connection.""" _ = workspace_id # Not used (yet) airbyte_instance = get_airbyte_server_instance( @@ -137,7 +232,11 @@ def get_connection( if status_ok(response.status_code) and response.connection_response: return response.connection_response - raise AirbyteMissingResourceError(connection_id, "connection", response.text) + raise AirbyteMissingResourceError( + resource_name_or_id=connection_id, + resource_type="connection", + log_text=response.raw_response.text, + ) def run_connection( @@ -146,7 +245,7 @@ def run_connection( *, api_root: str, api_key: str, -) -> api.ConnectionResponse: +) -> models.JobResponse: """Get a connection. If block is True, this will block until the connection is finished running. @@ -186,7 +285,7 @@ def get_job_logs( *, api_root: str, api_key: str, -) -> list[api.JobResponse]: +) -> list[models.JobResponse]: """Get a job's logs.""" airbyte_instance = get_airbyte_server_instance( api_key=api_key, @@ -213,11 +312,11 @@ def get_job_logs( def get_job_info( - job_id: str, + job_id: int, *, api_root: str, api_key: str, -) -> api.JobResponse: +) -> models.JobResponse: """Get a job.""" airbyte_instance = get_airbyte_server_instance( api_key=api_key, @@ -231,7 +330,11 @@ def get_job_info( if status_ok(response.status_code) and response.job_response: return response.job_response - raise AirbyteMissingResourceError(job_id, "job", response.text) + raise AirbyteMissingResourceError( + resource_name_or_id=str(job_id), + resource_type="job", + log_text=response.raw_response.text, + ) # Create, get, and delete sources @@ -244,7 +347,7 @@ def create_source( config: dict[str, Any], api_root: str, api_key: str, -) -> api.SourceResponse: +) -> models.SourceResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( api_key=api_key, @@ -273,7 +376,7 @@ def get_source( *, api_root: str, api_key: str, -) -> api.SourceResponse: +) -> models.SourceResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( api_key=api_key, @@ -284,10 +387,14 @@ def get_source( source_id=source_id, ), ) - if status_ok(response.status_code) and response.connection_response: - return response.connection_response + if status_ok(response.status_code) and response.source_response: + return response.source_response - raise AirbyteMissingResourceError(source_id, "source", response.text) + raise AirbyteMissingResourceError( + resource_name_or_id=source_id, + resource_type="source", + log_text=response.raw_response.text, + ) def delete_source( @@ -327,7 +434,7 @@ def create_destination( config: dict[str, Any], api_root: str, api_key: str, -) -> api.DestinationResponse: +) -> models.DestinationResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( api_key=api_key, @@ -354,7 +461,7 @@ def get_destination( *, api_root: str, api_key: str, -) -> api.DestinationResponse: +) -> models.DestinationResponse: """Get a connection.""" airbyte_instance = get_airbyte_server_instance( api_key=api_key, @@ -365,33 +472,38 @@ def get_destination( destination_id=destination_id, ), ) - if status_ok(response.status_code): + if status_ok(response.status_code) and response.destination_response: # TODO: This is a temporary workaround to resolve an issue where # the destination API response is of the wrong type. # https://github.com/airbytehq/pyairbyte/issues/320 raw_response: dict[str, Any] = json.loads(response.raw_response.text) raw_configuration: dict[str, Any] = raw_response["configuration"] + destination_type = raw_response.get("destinationType") if destination_type == "snowflake": - response.destination_response.configuration = models.DestinationSnowflake.from_dict( - raw_configuration, + response.destination_response.configuration = models.DestinationSnowflake( + **raw_configuration, ) if destination_type == "bigquery": - response.destination_response.configuration = models.DestinationBigquery.from_dict( - raw_configuration, + response.destination_response.configuration = models.DestinationBigquery( + **raw_configuration, ) if destination_type == "postgres": - response.destination_response.configuration = models.DestinationPostgres.from_dict( - raw_configuration, + response.destination_response.configuration = models.DestinationPostgres( + **raw_configuration, ) if destination_type == "duckdb": - response.destination_response.configuration = models.DestinationDuckdb.from_dict( - raw_configuration, + response.destination_response.configuration = models.DestinationDuckdb( + **raw_configuration, ) return response.destination_response - raise AirbyteMissingResourceError(destination_id, "destination", response.text) + raise AirbyteMissingResourceError( + resource_name_or_id=destination_id, + resource_type="destination", + log_text=response.raw_response.text, + ) def delete_destination( @@ -448,17 +560,17 @@ def create_connection( ) stream_configurations.append(stream_configuration) - stream_configurations = models.StreamConfigurations(stream_configurations) + stream_configurations_obj = models.StreamConfigurations(stream_configurations) response = airbyte_instance.connections.create_connection( models.ConnectionCreateRequest( name=name, source_id=source_id, destination_id=destination_id, - configurations=stream_configurations, + configurations=stream_configurations_obj, prefix=prefix, ), ) - if not status_ok(response.status_code): + if not status_ok(response.status_code) or response.connection_response is None: raise AirbyteError( context={ "source_id": source_id, diff --git a/airbyte/_util/text_util.py b/airbyte/_util/text_util.py new file mode 100644 index 00000000..ed3578d3 --- /dev/null +++ b/airbyte/_util/text_util.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Utility functions for working with text.""" + +from __future__ import annotations + +import ulid + + +def generate_ulid() -> str: + """Generate a new ULID.""" + return str(ulid.ULID()) + + +def generate_random_suffix() -> str: + """Generate a random suffix for use in temporary names. + + By default, this function generates a ULID and returns a 9-character string + which will be monotonically sortable. It is not guaranteed to be unique but + is sufficient for small-scale and medium-scale use cases. + """ + ulid_str = generate_ulid() + return ulid_str[:6] + ulid_str[-3:] diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index 775a64b3..dd1d740e 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -60,10 +60,6 @@ class CacheBase(SqlConfig, AirbyteWriterInterface): _name: str = PrivateAttr() - _deployed_api_root: str | None = PrivateAttr(default=None) - _deployed_workspace_id: str | None = PrivateAttr(default=None) - _deployed_destination_id: str | None = PrivateAttr(default=None) - _sql_processor_class: type[SqlProcessorBase] = PrivateAttr() _read_processor: SqlProcessorBase = PrivateAttr() diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index b4543a1f..0a725bb8 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, cast from airbyte._util import api_util +from airbyte.cloud.connectors import CloudDestination, CloudSource from airbyte.cloud.sync_results import SyncResult @@ -68,6 +69,14 @@ def source_id(self) -> str: return cast(str, self._source_id) + @property + def source(self) -> CloudSource: + """Get the source object.""" + return CloudSource( + workspace=self.workspace, + connector_id=self.source_id, + ) + @property def destination_id(self) -> str: """The ID of the destination.""" @@ -79,13 +88,21 @@ def destination_id(self) -> str: return cast(str, self._destination_id) + @property + def destination(self) -> CloudDestination: + """Get the source object.""" + return CloudDestination( + workspace=self.workspace, + connector_id=self.destination_id, + ) + @property def stream_names(self) -> list[str]: """The stream names.""" if not self._connection_info: self._connection_info = self._fetch_connection_info() - return [stream.name for stream in self._connection_info.configurations.streams] + return [stream.name for stream in self._connection_info.configurations.streams or []] @property def table_prefix(self) -> str: @@ -93,7 +110,7 @@ def table_prefix(self) -> str: if not self._connection_info: self._connection_info = self._fetch_connection_info() - return self._connection_info.prefix + return self._connection_info.prefix or "" @property def connection_url(self) -> str | None: @@ -154,7 +171,7 @@ def get_previous_sync_logs( SyncResult( workspace=self.workspace, connection=self, - job_id=sync_log.job_id, + job_id=str(sync_log.job_id), _latest_job_info=sync_log, ) for sync_log in sync_logs @@ -189,28 +206,22 @@ def get_sync_result( # Deletions - def _permanently_delete( + def permanently_delete( self, *, - delete_source: bool = False, - delete_destination: bool = False, + cascade_delete_source: bool = False, + cascade_delete_destination: bool = False, ) -> None: """Delete the connection. Args: - delete_source: Whether to also delete the source. - delete_destination: Whether to also delete the destination. + cascade_delete_source: Whether to also delete the source. + cascade_delete_destination: Whether to also delete the destination. """ - self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) - connection=self - ) + self.workspace.permanently_delete_connection(self) - if delete_source: - self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) - source=self.source_id - ) + if cascade_delete_source: + self.workspace.permanently_delete_source(self.source_id) - if delete_destination: - self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API - destination=self.destination_id, - ) + if cascade_delete_destination: + self.workspace.permanently_delete_destination(self.destination_id) diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py new file mode 100644 index 00000000..eb8a5e68 --- /dev/null +++ b/airbyte/cloud/connectors.py @@ -0,0 +1,81 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Cloud connectors module for working with Cloud sources and destinations.""" + +from __future__ import annotations + +import abc +from typing import TYPE_CHECKING, Literal + + +if TYPE_CHECKING: + from airbyte.cloud.workspaces import CloudWorkspace + + +class CloudConnector(abc.ABC): + """A cloud connector is a deployed source or destination on Airbyte Cloud. + + You can use a connector object to manage the connector. + """ + + @property + @abc.abstractmethod + def connector_type(self) -> Literal["source", "destination"]: + """Get the type of the connector.""" + ... + + def __init__( + self, + workspace: CloudWorkspace, + connector_id: str, + ) -> None: + """Initialize a cloud connector object.""" + self.workspace = workspace + """The workspace that the connector belongs to.""" + self.connector_id = connector_id + """The ID of the connector.""" + + @property + def connector_url(self) -> str: + """Get the URL of the source connector.""" + return f"{self.workspace.workspace_url}/{self.connector_type}s/{self.connector_id}" + + def permanently_delete(self) -> None: + """Permanently delete the connector.""" + if self.connector_type == "source": + self.workspace.permanently_delete_source(self.connector_id) + else: + self.workspace.permanently_delete_destination(self.connector_id) + + +class CloudSource(CloudConnector): + """A cloud source is a source that is deployed on Airbyte Cloud.""" + + @property + def source_id(self) -> str: + """Get the ID of the source. + + This is an alias for `connector_id`. + """ + return self.connector_id + + @property + def connector_type(self) -> Literal["source", "destination"]: + """Get the type of the connector.""" + return "source" + + +class CloudDestination(CloudConnector): + """A cloud destination is a destination that is deployed on Airbyte Cloud.""" + + @property + def destination_id(self) -> str: + """Get the ID of the destination. + + This is an alias for `connector_id`. + """ + return self.connector_id + + @property + def connector_type(self) -> Literal["source", "destination"]: + """Get the type of the connector.""" + return "destination" diff --git a/airbyte/cloud/experimental.py b/airbyte/cloud/experimental.py deleted file mode 100644 index c5ccf23a..00000000 --- a/airbyte/cloud/experimental.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Experimental features for interacting with the Airbyte Cloud API. - -You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These -features are subject to change and may not be available in all environments. **Future versions of -PyAirbyte may remove or change these features without notice.** - -To use this module, replace an import like this: - -```python -from airbyte.cloud import CloudConnection, CloudWorkspace -``` - -with an import like this: - -```python -from airbyte.cloud.experimental import CloudConnection, CloudWorkspace -``` - -You can toggle between the stable and experimental versions of these classes by changing the import -path. This allows you to test new features without requiring substantial changes to your codebase. - -""" -# ruff: noqa: SLF001 # This file accesses private members of other classes. - -from __future__ import annotations - -import warnings - -from airbyte import exceptions as exc -from airbyte.cloud.connections import CloudConnection as Stable_CloudConnection -from airbyte.cloud.workspaces import CloudWorkspace as Stable_CloudWorkspace - - -# This module is not imported anywhere by default, so this warning should only print if the user -# explicitly imports it. -warnings.warn( - message="The `airbyte.cloud.experimental` module is experimental and may change in the future.", - category=exc.AirbyteExperimentalFeatureWarning, - stacklevel=2, -) - - -class CloudWorkspace(Stable_CloudWorkspace): # noqa: D101 # Docstring inherited from parent. - __doc__ = ( - f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}" - ) - deploy_connection = Stable_CloudWorkspace._deploy_connection - deploy_source = Stable_CloudWorkspace._deploy_source - deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination - permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection - permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source - permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination - - -class CloudConnection(Stable_CloudConnection): # noqa: D101 # Docstring inherited from parent. - __doc__ = ( - f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}" - ) - permanently_delete = Stable_CloudConnection._permanently_delete diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index 6bd52a24..915935ec 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -11,25 +11,17 @@ from typing import TYPE_CHECKING from airbyte import exceptions as exc -from airbyte._util.api_util import ( - CLOUD_API_ROOT, - create_connection, - create_destination, - create_source, - delete_connection, - delete_destination, - delete_source, - get_workspace, -) -from airbyte.cloud._destination_util import get_destination_config_from_cache +from airbyte._util import api_util, text_util from airbyte.cloud.connections import CloudConnection +from airbyte.cloud.connectors import CloudDestination, CloudSource from airbyte.cloud.sync_results import SyncResult -from airbyte.sources.base import Source if TYPE_CHECKING: - from airbyte._util.api_imports import DestinationResponse - from airbyte.caches.base import CacheBase + from collections.abc import Callable + + from airbyte.destinations.base import Destination + from airbyte.sources.base import Source @dataclass @@ -42,7 +34,7 @@ class CloudWorkspace: workspace_id: str api_key: str - api_root: str = CLOUD_API_ROOT + api_root: str = api_util.CLOUD_API_ROOT @property def workspace_url(self) -> str | None: @@ -58,194 +50,193 @@ def connect(self) -> None: serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct. """ - _ = get_workspace( + _ = api_util.get_workspace( api_root=self.api_root, api_key=self.api_key, workspace_id=self.workspace_id, ) print(f"Successfully connected to workspace: {self.workspace_url}") - # Deploy and delete sources + # Deploy sources and destinations - # TODO: Make this a public API - # https://github.com/airbytehq/pyairbyte/issues/228 - def _deploy_source( + def deploy_source( self, + name: str, source: Source, - ) -> str: + *, + unique: bool = True, + random_name_suffix: bool = False, + ) -> CloudSource: """Deploy a source to the workspace. - Returns the newly deployed source ID. + Returns the newly deployed source. + + Args: + name: The name to use when deploying. + source: The source object to deploy. + unique: Whether to require a unique name. If `True`, duplicate names + are not allowed. Defaults to `True`. + random_name_suffix: Whether to append a random suffix to the name. """ source_configuration = source.get_config().copy() source_configuration["sourceType"] = source.name.replace("source-", "") - deployed_source = create_source( - name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", + if random_name_suffix: + name += f" (ID: {text_util.generate_random_suffix()})" + + if unique: + existing = self.list_sources(name=name) + if existing: + raise exc.AirbyteDuplicateResourcesError( + resource_type="destination", + resource_name=name, + ) + + deployed_source = api_util.create_source( + name=name, api_root=self.api_root, api_key=self.api_key, workspace_id=self.workspace_id, config=source_configuration, ) + return CloudSource( + workspace=self, + connector_id=deployed_source.source_id, + ) - # Set the deployment Ids on the source object - source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API - source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API - source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API - - return deployed_source.source_id - - def _permanently_delete_source( + def deploy_destination( self, - source: str | Source, - ) -> None: - """Delete a source from the workspace. - - You can pass either the source ID `str` or a deployed `Source` object. - """ - if not isinstance(source, str | Source): - raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 - - if isinstance(source, Source): - if not source._deployed_source_id: # noqa: SLF001 - raise ValueError("Source has not been deployed.") # noqa: TRY003 + name: str, + destination: Destination, + *, + unique: bool = True, + random_name_suffix: bool = False, + ) -> CloudDestination: + """Deploy a destination to the workspace. - source_id = source._deployed_source_id # noqa: SLF001 + Returns the newly deployed destination ID. - elif isinstance(source, str): - source_id = source + Args: + name: The name to use when deploying. + destination: The destination object to deploy. + unique: Whether to require a unique name. If `True`, duplicate names + are not allowed. Defaults to `True`. + random_name_suffix: Whether to append a random suffix to the name. + """ + destination_configuration = destination.get_config().copy() + destination_configuration["destinationType"] = destination.name.replace("destination-", "") + + if random_name_suffix: + name += f" (ID: {text_util.generate_random_suffix()})" + + if unique: + existing = self.list_destinations(name=name) + if existing: + raise exc.AirbyteDuplicateResourcesError( + resource_type="destination", + resource_name=name, + ) - delete_source( - source_id=source_id, + deployed_destination = api_util.create_destination( + name=name, api_root=self.api_root, api_key=self.api_key, + workspace_id=self.workspace_id, + config=destination_configuration, + ) + return CloudDestination( + workspace=self, + connector_id=deployed_destination.destination_id, ) - # Deploy and delete destinations - - # TODO: Make this a public API - # https://github.com/airbytehq/pyairbyte/issues/228 - def _deploy_cache_as_destination( + def permanently_delete_source( self, - cache: CacheBase, - ) -> str: - """Deploy a cache to the workspace as a new destination. + source: str | CloudSource, + ) -> None: + """Delete a source from the workspace. - Returns the newly deployed destination ID. + You can pass either the source ID `str` or a deployed `Source` object. """ - cache_type_name = cache.__class__.__name__.replace("Cache", "") + if not isinstance(source, str | CloudSource): + raise exc.PyAirbyteInputError( + message="Invalid source type.", + input_value=type(source).__name__, + ) - deployed_destination: DestinationResponse = create_destination( - name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", + api_util.delete_source( + source_id=source.connector_id if isinstance(source, CloudSource) else source, api_root=self.api_root, api_key=self.api_key, - workspace_id=self.workspace_id, - config=get_destination_config_from_cache(cache), ) - # Set the deployment Ids on the source object - cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API - cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API - cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API - - return deployed_destination.destination_id + # Deploy and delete destinations - def _permanently_delete_destination( + def permanently_delete_destination( self, - *, - destination: str | None = None, - cache: CacheBase | None = None, + destination: str | CloudDestination, ) -> None: """Delete a deployed destination from the workspace. You can pass either the `Cache` class or the deployed destination ID as a `str`. """ - if destination is None and cache is None: - raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 - if destination is not None and cache is not None: - raise ValueError( # noqa: TRY003 - "You must provide either a destination ID or a cache object, not both." + if not isinstance(destination, str | CloudDestination): + raise exc.PyAirbyteInputError( + message="Invalid destination type.", + input_value=type(destination).__name__, ) - if cache: - if not cache._deployed_destination_id: # noqa: SLF001 - raise ValueError("Cache has not been deployed.") # noqa: TRY003 - - destination = cache._deployed_destination_id # noqa: SLF001 - - if destination is None: - raise ValueError("No destination ID provided.") # noqa: TRY003 - - delete_destination( - destination_id=destination, + api_util.delete_destination( + destination_id=( + destination if isinstance(destination, str) else destination.destination_id + ), api_root=self.api_root, api_key=self.api_key, ) # Deploy and delete connections - # TODO: Make this a public API - # https://github.com/airbytehq/pyairbyte/issues/228 - def _deploy_connection( + def deploy_connection( self, - source: Source | str, - cache: CacheBase | None = None, - destination: str | None = None, + connection_name: str, + *, + source: CloudSource | str, + destination: CloudDestination | str, table_prefix: str | None = None, selected_streams: list[str] | None = None, ) -> CloudConnection: - """Deploy a source and cache to the workspace as a new connection. + """Create a new connection between an already deployed source and destination. - Returns the newly deployed connection ID as a `str`. + Returns the newly deployed connection object. Args: - source (Source | str): The source to deploy. You can pass either an already deployed - source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, - it will be deployed automatically. - cache (CacheBase, optional): The cache to deploy as a new destination. You can provide - `cache` or `destination`, but not both. - destination (str, optional): The destination ID to use. You can provide - `cache` or `destination`, but not both. - table_prefix (str, optional): The table prefix to use for the cache. If not provided, - the cache's table prefix will be used. - selected_streams (list[str], optional): The selected stream names to use for the - connection. If not provided, the source's selected streams will be used. + connection_name: The name of the connection. + source: The deployed source. You can pass a source ID or a CloudSource object. + destination: The deployed destination. You can pass a destination ID or a + CloudDestination object. + table_prefix: Optional. The table prefix to use when syncing to the destination. + selected_streams: The selected stream names to sync within the connection. """ + if not selected_streams: + raise exc.PyAirbyteInputError( + guidance="You must provide `selected_streams` when creating a connection." + ) + # Resolve source ID source_id: str - if isinstance(source, Source): - selected_streams = selected_streams or source.get_selected_streams() - source_id = ( - source._deployed_source_id # noqa: SLF001 # Access to non-public API - or self._deploy_source(source) - ) - else: - source_id = source - if not selected_streams: - raise exc.PyAirbyteInputError( - guidance="You must provide `selected_streams` when deploying a source ID." - ) + source_id = source if isinstance(source, str) else source.connector_id - # Resolve destination ID - destination_id: str - if destination: - destination_id = destination - elif cache: - table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") - if not cache._deployed_destination_id: # noqa: SLF001 - destination_id = self._deploy_cache_as_destination(cache) - else: - destination_id = cache._deployed_destination_id # noqa: SLF001 - else: + # destination is already deployed + destination_id = destination if isinstance(destination, str) else destination.connector_id + if not selected_streams: raise exc.PyAirbyteInputError( - guidance="You must provide either a destination ID or a cache object." + guidance=( + "You must provide `selected_streams` when creating a connection " + "from an existing destination." + ) ) - assert source_id is not None - assert destination_id is not None - - deployed_connection = create_connection( - name="Connection (Deployed by PyAirbyte)", + deployed_connection = api_util.create_connection( + name=connection_name, source_id=source_id, destination_id=destination_id, api_root=self.api_root, @@ -255,15 +246,6 @@ def _deploy_connection( prefix=table_prefix or "", ) - if isinstance(source, Source): - source._deployed_api_root = self.api_root # noqa: SLF001 - source._deployed_workspace_id = self.workspace_id # noqa: SLF001 - source._deployed_source_id = source_id # noqa: SLF001 - if cache: - cache._deployed_api_root = self.api_root # noqa: SLF001 - cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 - cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 - return CloudConnection( workspace=self, connection_id=deployed_connection.connection_id, @@ -285,7 +267,7 @@ def get_connection( connection_id=connection_id, ) - def _permanently_delete_connection( + def permanently_delete_connection( self, connection: str | CloudConnection, *, @@ -302,17 +284,17 @@ def _permanently_delete_connection( connection_id=connection, ) - delete_connection( + api_util.delete_connection( connection_id=connection.connection_id, api_root=self.api_root, api_key=self.api_key, workspace_id=self.workspace_id, ) if delete_source: - self._permanently_delete_source(source=connection.source_id) + self.permanently_delete_source(source=connection.source_id) if delete_destination: - self._permanently_delete_destination(destination=connection.destination_id) + self.permanently_delete_destination(destination=connection.destination_id) # Run syncs @@ -380,3 +362,76 @@ def get_previous_sync_logs( return connection.get_previous_sync_logs( limit=limit, ) + + # List sources, destinations, and connections + + def list_connections( + self, + name: str | None = None, + *, + name_filter: Callable | None = None, + ) -> list[CloudConnection]: + """List connections by name in the workspace.""" + connections = api_util.list_connections( + api_root=self.api_root, + api_key=self.api_key, + workspace_id=self.workspace_id, + name=name, + name_filter=name_filter, + ) + return [ + CloudConnection( + workspace=self, + connection_id=connection.connection_id, + source=None, + destination=None, + ) + for connection in connections + if name is None or connection.name == name + ] + + def list_sources( + self, + name: str | None = None, + *, + name_filter: Callable | None = None, + ) -> list[CloudSource]: + """List all sources in the workspace.""" + sources = api_util.list_sources( + api_root=self.api_root, + api_key=self.api_key, + workspace_id=self.workspace_id, + name=name, + name_filter=name_filter, + ) + return [ + CloudSource( + workspace=self, + connector_id=source.source_id, + ) + for source in sources + if name is None or source.name == name + ] + + def list_destinations( + self, + name: str | None = None, + *, + name_filter: Callable | None = None, + ) -> list[CloudDestination]: + """List all destinations in the workspace.""" + destinations = api_util.list_destinations( + api_root=self.api_root, + api_key=self.api_key, + workspace_id=self.workspace_id, + name=name, + name_filter=name_filter, + ) + return [ + CloudDestination( + workspace=self, + connector_id=destination.destination_id, + ) + for destination in destinations + if name is None or destination.name == name + ] diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 0713bb64..87159d59 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -486,6 +486,15 @@ class AirbyteMissingResourceError(AirbyteError): resource_name_or_id: str | None = None +@dataclass +class AirbyteDuplicateResourcesError(AirbyteError): + """Process failed because resource name was not unique.""" + + resource_type: str | None = None + resource_name: str | None = None + + +# Custom Warnings @dataclass class AirbyteMultipleResourcesError(AirbyteError): """Could not locate the resource because multiple matching resources were found.""" diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index aeffc8ed..564215f4 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -51,7 +51,7 @@ class Source(ConnectorBase): """A class representing a source that can be called.""" - connector_type: Literal["source"] = "source" + connector_type = "source" def __init__( self, @@ -84,10 +84,6 @@ def __init__( if streams is not None: self.select_streams(streams) - self._deployed_api_root: str | None = None - self._deployed_workspace_id: str | None = None - self._deployed_source_id: str | None = None - def set_streams(self, streams: list[str]) -> None: """Deprecated. See select_streams().""" warnings.warn( From 4777d2407e5329a968e7a7fd7feac80088e4f6af Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 15 Oct 2024 07:46:15 -0700 Subject: [PATCH 02/15] fix docs generation --- docs/generate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/generate.py b/docs/generate.py index 0cf39896..02281768 100755 --- a/docs/generate.py +++ b/docs/generate.py @@ -18,7 +18,7 @@ def run() -> None: """Generate docs for all public modules in PyAirbyte and save them to docs/generated.""" - public_modules = ["airbyte", "airbyte/cloud/experimental.py"] + public_modules = ["airbyte", "airbyte/cli.py"] # recursively delete the docs/generated folder if it exists if pathlib.Path("docs/generated").exists(): From a270393ce66ac90fca6e937ee799123332cdf375 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 16 Oct 2024 09:43:52 -0700 Subject: [PATCH 03/15] pseudocode wip --- airbyte/_util/api_util.py | 132 +++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 403ff659..e5062fad 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -14,9 +14,11 @@ from __future__ import annotations import json -from typing import TYPE_CHECKING, Any +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Literal import airbyte_api +import requests from airbyte_api import api, models from airbyte.exceptions import ( @@ -26,6 +28,8 @@ AirbyteMultipleResourcesError, PyAirbyteInputError, ) +from airbyte.secrets.base import SecretString +from airbyte.sources.util import get_connector if TYPE_CHECKING: @@ -35,6 +39,19 @@ JOB_WAIT_INTERVAL_SECS = 2.0 JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour CLOUD_API_ROOT = "https://api.airbyte.com/v1" +"""The Airbyte Cloud API root URL. + +This is the root URL for the Airbyte Cloud API. It is used to interact with the Airbyte Cloud API +and is the default API root for the `CloudWorkspace` class. +- https://reference.airbyte.com/reference/getting-started +""" +CLOUD_CONFIG_API_ROOT = "https://cloud.airbyte.com/api/v1" +"""Internal-Use API Root, aka Airbyte "Config API". + +Documentation: +- https://docs.airbyte.com/api-documentation#configuration-api-deprecated +- https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml +""" # Helper functions @@ -641,6 +658,119 @@ def delete_connection( ) +# Functions for leveraging the Airbyte Config API (may not be supported or stable) + + +@dataclass +class DockerImageOverride: + """Defines a connector image override.""" + + docker_image_override: str + override_level: Literal["workspace", "actor"] = "actor" + + +def set_actor_override( + *, + workspace_id: str, + actor_id: str, + actor_type: Literal["source", "destination"], + override: DockerImageOverride, + config_api_root: str = CLOUD_CONFIG_API_ROOT, + api_key: str | SecretString, +) -> None: + """Override the docker image and tag for a specific connector. + + https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7234 + + """ + path = config_api_root + "/v1/scoped_configuration/create" + headers: dict[str, Any] = { + "Content-Type": "application", + "Authorization": SecretString(f"Bearer {api_key}"), + } + request_body: dict[str, str] = { + "config_key": "docker_image", # TODO: Fix this. + "value": override.docker_image_override, + "scope_id": actor_id, + "scope_type": actor_type, + "resource_id": "", # TODO: Need to call something like get_actor_definition + "resource_type": "ACTOR_DEFINITION", + "origin": "", # TODO: Need to get user ID somehow or use another origin type + "origin_type": "USER", + } + response = requests.request( + method="POST", + url=path, + headers=headers, + json=request_body, + ) + if not status_ok(response.status_code): + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "actor_id": actor_id, + "actor_type": actor_type, + "response": response, + }, + ) + + +def get_connector_image_override( + *, + workspace_id: str, + actor_id: str, + actor_type: Literal["source", "destination"], + config_api_root: str = CLOUD_CONFIG_API_ROOT, + api_key: str, +) -> DockerImageOverride | None: + """Get the docker image and tag for a specific connector. + + Result is a tuple of two values: + - A boolean indicating if an override is set. + - The docker image and tag, either from the override if set, or from the . + """ + path = config_api_root + "/v1/scoped_configuration/list" + headers: dict[str, Any] = { + "Content-Type": "application", + "Authorization": SecretString(f"Bearer {api_key}"), + } + request_body: dict[str, str] = { + "config_key": "docker_image", # TODO: Fix this. + } + response = requests.request( + method="GET", + url=path, + headers=headers, + json=request_body, + ) + if not status_ok(response.status_code): + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "actor_id": actor_id, + "actor_type": actor_type, + "response": response, + }, + ) + if not response.json(): + return None + + overrides = [ + DockerImageOverride( + docker_image_override=entry["value"], + override_level=entry["scope_type"], + ) + for entry in response.json() + ] + if not overrides: + return None + if len(overrides) > 1: + raise NotImplementedError( + "Multiple overrides found. This is not yet supported.", + ) + return overrides[0] + + # Not yet implemented From c2b6bd7faba96aaa75e0a5b0c29224513167d883 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 9 Dec 2024 20:02:51 -0800 Subject: [PATCH 04/15] clean up --- airbyte/_util/api_util.py | 8 +++----- airbyte/cloud/connections.py | 18 +----------------- airbyte/cloud/workspaces.py | 6 ------ 3 files changed, 4 insertions(+), 28 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 9e58c6e6..d9435b28 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -13,13 +13,12 @@ from __future__ import annotations -from dataclasses import dataclass import json -from typing import TYPE_CHECKING, Any -import requests -from typing_extensions import Literal +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Literal import airbyte_api +import requests from airbyte_api import api, models from airbyte.exceptions import ( @@ -43,7 +42,6 @@ DestinationConfiguration, ) - from airbyte.secrets.base import SecretString JOB_WAIT_INTERVAL_SECS = 2.0 diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 5206dd5d..10cb1d1b 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -89,14 +89,6 @@ def source(self) -> CloudSource: ) return self._cloud_source_object - @property - def source(self) -> CloudSource: - """Get the source object.""" - return CloudSource( - workspace=self.workspace, - connector_id=self.source_id, - ) - @property def destination_id(self) -> str: """The ID of the destination.""" @@ -120,14 +112,6 @@ def destination(self) -> CloudDestination: ) return self._cloud_destination_object - @property - def destination(self) -> CloudDestination: - """Get the source object.""" - return CloudDestination( - workspace=self.workspace, - connector_id=self.destination_id, - ) - @property def stream_names(self) -> list[str]: """The stream names.""" @@ -205,7 +189,7 @@ def get_previous_sync_logs( SyncResult( workspace=self.workspace, connection=self, - job_id=str(sync_log.job_id), + job_id=sync_log.job_id, _latest_job_info=sync_log, ) for sync_log in sync_logs diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index 35411c7a..e8fb1549 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -14,7 +14,6 @@ from airbyte._util import api_util, text_util from airbyte.cloud.connections import CloudConnection from airbyte.cloud.connectors import CloudDestination, CloudSource -from airbyte.cloud.sync_results import SyncResult from airbyte.destinations.base import Destination from airbyte.secrets.base import SecretString @@ -22,7 +21,6 @@ if TYPE_CHECKING: from collections.abc import Callable - from airbyte.destinations.base import Destination from airbyte.sources.base import Source @@ -113,10 +111,6 @@ def deploy_source( workspace=self, connector_id=deployed_source.source_id, ) - return CloudSource( - workspace=self, - connector_id=deployed_source.source_id, - ) def deploy_destination( self, From af96d63e3a4dd2143c58a82891ea7d391b929d34 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 9 Dec 2024 20:05:21 -0800 Subject: [PATCH 05/15] clean up --- airbyte/_util/api_util.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index d9435b28..fbf06edc 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -43,7 +43,6 @@ ) - JOB_WAIT_INTERVAL_SECS = 2.0 JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour CLOUD_API_ROOT = "https://api.airbyte.com/v1" @@ -165,9 +164,6 @@ def list_connections( ] -# Get and run connections - - def list_workspaces( workspace_id: str, *, @@ -177,17 +173,19 @@ def list_workspaces( name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.WorkspaceResponse]: + """List workspaces.""" if name and name_filter: raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.") name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True) _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( + airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, api_root=api_root, ) + response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces( api.ListWorkspacesRequest( workspace_ids=[workspace_id], @@ -216,7 +214,7 @@ def list_sources( name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.SourceResponse]: - """Get a connection.""" + """List sources.""" if name and name_filter: raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.") From 4ebe5926f93c1ce5d80827393bbc7e2e14b5f460 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 9 Dec 2024 21:38:45 -0800 Subject: [PATCH 06/15] working get bearer auth token --- airbyte/_util/api_util.py | 97 ++++++++++++++----- .../cloud/test_cloud_api_util.py | 71 ++++++++++++++ 2 files changed, 144 insertions(+), 24 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index fbf06edc..286f565f 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -96,7 +96,7 @@ def get_workspace( client_id: SecretString, client_secret: SecretString, ) -> models.WorkspaceResponse: - """Get a connection.""" + """Get a workspace object.""" airbyte_instance = get_airbyte_server_instance( api_root=api_root, client_id=client_id, @@ -131,7 +131,7 @@ def list_connections( name: str | None = None, name_filter: Callable[[str], bool] | None = None, ) -> list[models.ConnectionResponse]: - """Get a connection.""" + """List connections.""" if name and name_filter: raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.") @@ -741,6 +741,38 @@ def delete_connection( # Functions for leveraging the Airbyte Config API (may not be supported or stable) +def get_bearer_token( + *, + client_id: SecretString, + client_secret: SecretString, + api_root: str = CLOUD_API_ROOT, +) -> SecretString: + """Get a bearer token. + + https://reference.airbyte.com/reference/createaccesstoken + """ + path = api_root + "/applications/token" + headers: dict[str, str] = { + "content-type": "application/json", + "accept": "application/json", + } + request_body: dict[str, str] = { + "grant-type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + } + response = requests.request( + method="POST", + url=path, + headers=headers, + json=request_body, + ) + if not status_ok(response.status_code): + response.raise_for_status() + + return SecretString(response.json()["access_token"]) + + @dataclass class DockerImageOverride: """Defines a connector image override.""" @@ -760,17 +792,17 @@ def set_actor_override( ) -> None: """Override the docker image and tag for a specific connector. - https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7234 - + https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5111 """ path = config_api_root + "/v1/scoped_configuration/create" headers: dict[str, Any] = { - "Content-Type": "application", + "Content-Type": "application/json", "Authorization": SecretString(f"Bearer {api_key}"), } request_body: dict[str, str] = { - "config_key": "docker_image", # TODO: Fix this. + # https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7376 "value": override.docker_image_override, + "config_key": "docker_image", # TODO: Fix this. "scope_id": actor_id, "scope_type": actor_type, "resource_id": "", # TODO: Need to call something like get_actor_definition @@ -795,30 +827,62 @@ def set_actor_override( ) +def check_connector( + *, + actor_id: str, + connector_type: Literal["source", "destination"], + client_id: SecretString, + client_secret: SecretString, + workspace_id: str | None = None, + api_root: str, +) -> tuple[bool, str | None]: + """Check a source. + + Returns a tuple of two values: + - A boolean indicating if the source passes the connection check. + - The error message if the source fails the check operation. + """ + bearer_token = get_bearer_token( + client_id=client_id, + client_secret=client_secret, + api_root=api_root, + ) + _ = source_id, workspace_id, api_root, api_key + raise NotImplementedError + + def get_connector_image_override( *, workspace_id: str, actor_id: str, actor_type: Literal["source", "destination"], config_api_root: str = CLOUD_CONFIG_API_ROOT, - api_key: str, + client_id: SecretString, + client_secret: SecretString, ) -> DockerImageOverride | None: """Get the docker image and tag for a specific connector. Result is a tuple of two values: - A boolean indicating if an override is set. - The docker image and tag, either from the override if set, or from the . + + https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5066 """ path = config_api_root + "/v1/scoped_configuration/list" + bearer_token = get_bearer_token( + client_id=client_id, + client_secret=client_secret, + api_root=config_api_root, + ) headers: dict[str, Any] = { "Content-Type": "application", - "Authorization": SecretString(f"Bearer {api_key}"), + "Authorization": SecretString(f"Bearer {bearer_token}"), } request_body: dict[str, str] = { "config_key": "docker_image", # TODO: Fix this. } response = requests.request( - method="GET", + method="POST", url=path, headers=headers, json=request_body, @@ -849,18 +913,3 @@ def get_connector_image_override( "Multiple overrides found. This is not yet supported.", ) return overrides[0] - - -# Not yet implemented - - -# def check_source( -# source_id: str, -# *, -# api_root: str, -# api_key: str, -# workspace_id: str | None = None, -# ) -> api.SourceCheckResponse: -# """Check a source.""" -# _ = source_id, workspace_id, api_root, api_key -# raise NotImplementedError diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index 4c0c54ed..e0d46864 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -9,9 +9,13 @@ from airbyte_api.models import DestinationResponse, SourceResponse, WorkspaceResponse from airbyte._util import api_util, text_util +from airbyte._util.api_util import get_bearer_token, check_connector, AirbyteError from airbyte_api.models import DestinationDuckdb, SourceFaker from airbyte.secrets.base import SecretString +import pytest + +# from unittest.mock import patch def test_get_workspace( @@ -226,3 +230,70 @@ def test_create_and_delete_connection( client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, ) + + +def test_get_bearer_token( + airbyte_cloud_client_id, + airbyte_cloud_client_secret, +) -> None: + try: + token: SecretString = get_bearer_token( + client_id=airbyte_cloud_client_id, + client_secret=airbyte_cloud_client_secret, + ) + assert token is not None + except AirbyteError as e: + pytest.fail(f"API call failed: {e}") + + +def test_check_connector_integration( + api_root, + airbyte_cloud_client_id, + airbyte_cloud_client_secret, +) -> None: + actor_id = "test_actor_id" + connector_type = "source" + try: + result, error_message = check_connector( + actor_id=actor_id, + connector_type=connector_type, + client_id=airbyte_cloud_client_id, + client_secret=airbyte_cloud_client_secret, + api_root=api_root, + ) + assert result is not None + except NotImplementedError: + pytest.fail("check_connector function is not implemented") + except AirbyteError as e: + pytest.fail(f"API call failed: {e}") + + +# @pytest.fixture +# def mock_response(): +# class MockResponse: +# def __init__(self, json_data, status_code): +# self.json_data = json_data +# self.status_code = status_code + +# def json(self): +# return self.json_data + +# return MockResponse + + +# def test_get_bearer_token_success(api_root, client_id, client_secret, mock_response): +# with patch("requests.request") as mock_request: +# mock_request.return_value = mock_response({"access_token": "test_token"}, 200) +# token = get_bearer_token( +# client_id=client_id, client_secret=client_secret, api_root=api_root +# ) +# assert token.get_secret_value() == "test_token" + + +# def test_get_bearer_token_failure(api_root, client_id, client_secret, mock_response): +# with patch("requests.request") as mock_request: +# mock_request.return_value = mock_response({}, 400) +# with pytest.raises(AirbyteError): +# get_bearer_token( +# client_id=client_id, client_secret=client_secret, api_root=api_root +# ) From 591b09eea9af79435f2e8ad9fa7a560fdbb8164e Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 9 Dec 2024 21:40:36 -0800 Subject: [PATCH 07/15] remove commented code --- .../cloud/test_cloud_api_util.py | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index e0d46864..486c260a 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -266,34 +266,3 @@ def test_check_connector_integration( pytest.fail("check_connector function is not implemented") except AirbyteError as e: pytest.fail(f"API call failed: {e}") - - -# @pytest.fixture -# def mock_response(): -# class MockResponse: -# def __init__(self, json_data, status_code): -# self.json_data = json_data -# self.status_code = status_code - -# def json(self): -# return self.json_data - -# return MockResponse - - -# def test_get_bearer_token_success(api_root, client_id, client_secret, mock_response): -# with patch("requests.request") as mock_request: -# mock_request.return_value = mock_response({"access_token": "test_token"}, 200) -# token = get_bearer_token( -# client_id=client_id, client_secret=client_secret, api_root=api_root -# ) -# assert token.get_secret_value() == "test_token" - - -# def test_get_bearer_token_failure(api_root, client_id, client_secret, mock_response): -# with patch("requests.request") as mock_request: -# mock_request.return_value = mock_response({}, 400) -# with pytest.raises(AirbyteError): -# get_bearer_token( -# client_id=client_id, client_secret=client_secret, api_root=api_root -# ) From a21cf5795ec8a2eda645c53168530de6de8267e8 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 9 Dec 2024 23:16:26 -0800 Subject: [PATCH 08/15] still failing test_check_connector --- airbyte/_util/api_util.py | 93 +++++++++++++------ .../cloud/test_cloud_api_util.py | 29 ++++++ 2 files changed, 96 insertions(+), 26 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 286f565f..6a2d2e17 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -773,6 +773,65 @@ def get_bearer_token( return SecretString(response.json()["access_token"]) +def check_connector( + *, + actor_id: str, + connector_type: Literal["source", "destination"], + client_id: SecretString, + client_secret: SecretString, + workspace_id: str | None = None, + api_root: str = CLOUD_API_ROOT, + config_api_root: str = CLOUD_CONFIG_API_ROOT, +) -> tuple[bool, str | None]: + """Check a source. + + Raises an exception if the check fails. + + https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1409 + https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1995 + """ + _ = workspace_id # Not used (yet) + bearer_token: SecretString = get_bearer_token( + client_id=client_id, + client_secret=client_secret, + api_root=api_root, + ) + path = config_api_root + f"/{connector_type}s/check_connection" + headers: dict[str, Any] = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {bearer_token}", + } + request_body: dict[str, str] = { + f"{connector_type}Id": actor_id, + } + response = requests.request( + method="POST", + url=path, + headers=headers, + json=request_body, + ) + if not status_ok(response.status_code): + response.raise_for_status() + + response_json = response.json() + result, message = response_json.get("status"), response_json.get("message") + + if result == "succeeded": + return True, None + + if result == "failed": + return False, message + + raise AirbyteError( + context={ + "actor_id": actor_id, + "connector_type": connector_type, + "response": response, + }, + ) + + @dataclass class DockerImageOverride: """Defines a connector image override.""" @@ -788,16 +847,22 @@ def set_actor_override( actor_type: Literal["source", "destination"], override: DockerImageOverride, config_api_root: str = CLOUD_CONFIG_API_ROOT, - api_key: str | SecretString, + client_id: SecretString, + client_secret: SecretString, ) -> None: """Override the docker image and tag for a specific connector. https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5111 """ path = config_api_root + "/v1/scoped_configuration/create" + bearer_token: SecretString = get_bearer_token( + client_id=client_id, + client_secret=client_secret, + api_root=config_api_root, + ) headers: dict[str, Any] = { "Content-Type": "application/json", - "Authorization": SecretString(f"Bearer {api_key}"), + "Authorization": SecretString(f"Bearer {bearer_token}"), } request_body: dict[str, str] = { # https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7376 @@ -827,30 +892,6 @@ def set_actor_override( ) -def check_connector( - *, - actor_id: str, - connector_type: Literal["source", "destination"], - client_id: SecretString, - client_secret: SecretString, - workspace_id: str | None = None, - api_root: str, -) -> tuple[bool, str | None]: - """Check a source. - - Returns a tuple of two values: - - A boolean indicating if the source passes the connection check. - - The error message if the source fails the check operation. - """ - bearer_token = get_bearer_token( - client_id=client_id, - client_secret=client_secret, - api_root=api_root, - ) - _ = source_id, workspace_id, api_root, api_key - raise NotImplementedError - - def get_connector_image_override( *, workspace_id: str, diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index 486c260a..6974db2a 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -6,6 +6,7 @@ """ from __future__ import annotations +from typing import Literal from airbyte_api.models import DestinationResponse, SourceResponse, WorkspaceResponse from airbyte._util import api_util, text_util @@ -246,6 +247,34 @@ def test_get_bearer_token( pytest.fail(f"API call failed: {e}") +@pytest.mark.parametrize( + "connector_id, connector_type, expect_success", + [ + ("0f766e9e-636e-4687-8483-f2febc46d9ce", "source", True), + # ("test_connector_id", "destination", True), + ], +) +def test_check_connector( + airbyte_cloud_client_id: SecretString, + airbyte_cloud_client_secret: SecretString, + connector_id: str, + connector_type: Literal["source", "destination"], + expect_success: bool, +) -> None: + try: + result, error_message = check_connector( + actor_id=connector_id, + connector_type=connector_type, + client_id=airbyte_cloud_client_id, + client_secret=airbyte_cloud_client_secret, + ) + assert result is not None + except NotImplementedError: + pytest.fail("check_connector function is not implemented") + except AirbyteError as e: + pytest.fail(f"API call failed: {e}") + + def test_check_connector_integration( api_root, airbyte_cloud_client_id, From 544829c7876e0c0f5751e9741fe2bfa66e8005f8 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 9 Dec 2024 23:58:08 -0800 Subject: [PATCH 09/15] config api auth still not working, added tests --- .../cloud/test_cloud_api_util.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index 6974db2a..e67ab616 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -10,14 +10,18 @@ from airbyte_api.models import DestinationResponse, SourceResponse, WorkspaceResponse from airbyte._util import api_util, text_util -from airbyte._util.api_util import get_bearer_token, check_connector, AirbyteError +from airbyte._util.api_util import ( + get_bearer_token, + check_connector, + AirbyteError, + CLOUD_API_ROOT, + CLOUD_CONFIG_API_ROOT, +) from airbyte_api.models import DestinationDuckdb, SourceFaker from airbyte.secrets.base import SecretString import pytest -# from unittest.mock import patch - def test_get_workspace( workspace_id: str, @@ -233,14 +237,23 @@ def test_create_and_delete_connection( ) +@pytest.mark.parametrize( + "api_root", + [ + CLOUD_API_ROOT, + CLOUD_CONFIG_API_ROOT, + ], +) def test_get_bearer_token( airbyte_cloud_client_id, airbyte_cloud_client_secret, + api_root: str, ) -> None: try: token: SecretString = get_bearer_token( client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, + api_root=api_root, ) assert token is not None except AirbyteError as e: From 6cebe55632f5bf26cb47677df3886fa638dd2547 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 10 Dec 2024 19:04:10 -0800 Subject: [PATCH 10/15] updates/tweaks for check_connection --- airbyte/_util/api_util.py | 12 ++++++--- .../cloud/test_cloud_api_util.py | 26 ++----------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 6a2d2e17..c4986c86 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -750,6 +750,7 @@ def get_bearer_token( """Get a bearer token. https://reference.airbyte.com/reference/createaccesstoken + """ path = api_root + "/applications/token" headers: dict[str, str] = { @@ -757,10 +758,13 @@ def get_bearer_token( "accept": "application/json", } request_body: dict[str, str] = { - "grant-type": "client_credentials", + "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } + # if "api.airbyte" not in api_root: + # request_body["grant_type"] = "client_credentials" + response = requests.request( method="POST", url=path, @@ -785,10 +789,10 @@ def check_connector( ) -> tuple[bool, str | None]: """Check a source. - Raises an exception if the check fails. + Raises an exception if the check fails. Uses one of these endpoints: - https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1409 - https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1995 + - /v1/sources/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1409 + - /v1/destinations/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1995 """ _ = workspace_id # Not used (yet) bearer_token: SecretString = get_bearer_token( diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index e67ab616..301a068f 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -263,8 +263,8 @@ def test_get_bearer_token( @pytest.mark.parametrize( "connector_id, connector_type, expect_success", [ - ("0f766e9e-636e-4687-8483-f2febc46d9ce", "source", True), - # ("test_connector_id", "destination", True), + ("f45dd701-d1f0-4e8e-97c4-2b89c40ac928", "source", True), + # ("......-....-....-....-............", "destination", True), ], ) def test_check_connector( @@ -286,25 +286,3 @@ def test_check_connector( pytest.fail("check_connector function is not implemented") except AirbyteError as e: pytest.fail(f"API call failed: {e}") - - -def test_check_connector_integration( - api_root, - airbyte_cloud_client_id, - airbyte_cloud_client_secret, -) -> None: - actor_id = "test_actor_id" - connector_type = "source" - try: - result, error_message = check_connector( - actor_id=actor_id, - connector_type=connector_type, - client_id=airbyte_cloud_client_id, - client_secret=airbyte_cloud_client_secret, - api_root=api_root, - ) - assert result is not None - except NotImplementedError: - pytest.fail("check_connector function is not implemented") - except AirbyteError as e: - pytest.fail(f"API call failed: {e}") From 0d600f8549c23bf55c9dd7b92b2458bfd7cd2786 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 11 Dec 2024 11:49:57 -0800 Subject: [PATCH 11/15] push wip debug updates --- airbyte/_util/api_util.py | 58 ++++++++++++++----- .../cloud/test_cloud_api_util.py | 31 +++++++++- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index c4986c86..b40fc8ea 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -59,6 +59,8 @@ - https://docs.airbyte.com/api-documentation#configuration-api-deprecated - https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml """ +CLOUD_CONFIG_API_TEST_ROOT = "https://dev-cloud.airbyte.com/api/v1" +"""Test API root".""" def status_ok(status_code: int) -> bool: @@ -758,15 +760,15 @@ def get_bearer_token( "accept": "application/json", } request_body: dict[str, str] = { - "grant_type": "client_credentials", + # "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, } # if "api.airbyte" not in api_root: # request_body["grant_type"] = "client_credentials" - response = requests.request( - method="POST", + response = requests.post( + # method="POST", url=path, headers=headers, json=request_body, @@ -806,17 +808,41 @@ def check_connector( "Accept": "application/json", "Authorization": f"Bearer {bearer_token}", } - request_body: dict[str, str] = { - f"{connector_type}Id": actor_id, - } - response = requests.request( + # Create a Request object + req = requests.Request( method="POST", url=path, headers=headers, - json=request_body, + json={ + f"{connector_type}Id": actor_id, + }, ) + # Prepare the request + prepared = req.prepare() + + # Inspect the prepared request (TODO: Remove extra debug prints before merging) + print(f"URL: {prepared.url}") + print(f"Headers: {prepared.headers}") + print(f"Body: {prepared.body}") + + # Send the prepared request using a Session + with requests.Session() as session: + response = session.send(prepared) + if not status_ok(response.status_code): - response.raise_for_status() + try: + response.raise_for_status() + except requests.HTTPError as ex: + raise AirbyteError( + # TODO: Remove extra debug prints before merging + context={ + "path": path, + "url": response.request.url, + "body": response.request.body, + "headers": response.request.headers, + "response": response.__dict__, + }, + ) from ex response_json = response.json() result, message = response_json.get("status"), response_json.get("message") @@ -901,6 +927,7 @@ def get_connector_image_override( workspace_id: str, actor_id: str, actor_type: Literal["source", "destination"], + api_root: str = CLOUD_API_ROOT, config_api_root: str = CLOUD_CONFIG_API_ROOT, client_id: SecretString, client_secret: SecretString, @@ -913,15 +940,17 @@ def get_connector_image_override( https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5066 """ - path = config_api_root + "/v1/scoped_configuration/list" + path = config_api_root + "/scoped_configuration/list" bearer_token = get_bearer_token( client_id=client_id, client_secret=client_secret, - api_root=config_api_root, + api_root=api_root, ) headers: dict[str, Any] = { "Content-Type": "application", - "Authorization": SecretString(f"Bearer {bearer_token}"), + "Authorization": str(f"Bearer {bearer_token}"), + "Connection": None, + "User-Agent": None, } request_body: dict[str, str] = { "config_key": "docker_image", # TODO: Fix this. @@ -936,9 +965,12 @@ def get_connector_image_override( raise AirbyteError( context={ "workspace_id": workspace_id, + "url": response.request.url, "actor_id": actor_id, "actor_type": actor_type, - "response": response, + "status_code": response.status_code, + "request": response.request.__dict__, + "response": response.__dict__, }, ) if not response.json(): diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index 301a068f..110d9eca 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -16,6 +16,7 @@ AirbyteError, CLOUD_API_ROOT, CLOUD_CONFIG_API_ROOT, + get_connector_image_override, ) from airbyte_api.models import DestinationDuckdb, SourceFaker @@ -241,7 +242,6 @@ def test_create_and_delete_connection( "api_root", [ CLOUD_API_ROOT, - CLOUD_CONFIG_API_ROOT, ], ) def test_get_bearer_token( @@ -286,3 +286,32 @@ def test_check_connector( pytest.fail("check_connector function is not implemented") except AirbyteError as e: pytest.fail(f"API call failed: {e}") + + +@pytest.mark.parametrize( + "connector_id, connector_type, expect_success", + [ + ("f45dd701-d1f0-4e8e-97c4-2b89c40ac928", "source", True), + # ("......-....-....-....-............", "destination", True), + ], +) +def test_get_connector_overrides( + airbyte_cloud_client_id: SecretString, + airbyte_cloud_client_secret: SecretString, + connector_id: str, + connector_type: Literal["source", "destination"], + expect_success: bool, +) -> None: + try: + result = get_connector_image_override( + workspace_id=None, # type: ignore # Unused anyway + actor_id=connector_id, + actor_type=connector_type, + client_id=airbyte_cloud_client_id, + client_secret=airbyte_cloud_client_secret, + ) + assert result is not None + except NotImplementedError: + pytest.fail("check_connector function is not implemented") + except AirbyteError as e: + pytest.fail(f"API call failed: {e}") From 4eef47f9084e55805e8ad9f1e07dd1843565f78c Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 16 Dec 2024 13:52:12 -0800 Subject: [PATCH 12/15] update, refactor config api util methods --- airbyte/_util/api_util.py | 252 +++++++++++++++++--------------------- 1 file changed, 111 insertions(+), 141 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index b40fc8ea..6a4ee9a3 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -68,6 +68,14 @@ def status_ok(status_code: int) -> bool: return status_code >= 200 and status_code < 300 # noqa: PLR2004 # allow inline magic numbers +def get_config_api_root(api_root: str) -> str: + """Get the configuration API root from the main API root.""" + if api_root == CLOUD_API_ROOT: + return CLOUD_CONFIG_API_ROOT + + raise NotImplementedError("Configuration API root not implemented for this API root.") + + def get_airbyte_server_instance( *, api_root: str, @@ -754,24 +762,16 @@ def get_bearer_token( https://reference.airbyte.com/reference/createaccesstoken """ - path = api_root + "/applications/token" - headers: dict[str, str] = { - "content-type": "application/json", - "accept": "application/json", - } - request_body: dict[str, str] = { - # "grant_type": "client_credentials", - "client_id": client_id, - "client_secret": client_secret, - } - # if "api.airbyte" not in api_root: - # request_body["grant_type"] = "client_credentials" - response = requests.post( - # method="POST", - url=path, - headers=headers, - json=request_body, + url=api_root + "/applications/token", + headers={ + "content-type": "application/json", + "accept": "application/json", + }, + json={ + "client_id": client_id, + "client_secret": client_secret, + }, ) if not status_ok(response.status_code): response.raise_for_status() @@ -779,73 +779,74 @@ def get_bearer_token( return SecretString(response.json()["access_token"]) -def check_connector( +def _make_config_api_request( *, - actor_id: str, - connector_type: Literal["source", "destination"], + api_root: str, + path: str, + json: dict[str, Any], client_id: SecretString, client_secret: SecretString, - workspace_id: str | None = None, - api_root: str = CLOUD_API_ROOT, - config_api_root: str = CLOUD_CONFIG_API_ROOT, -) -> tuple[bool, str | None]: - """Check a source. - - Raises an exception if the check fails. Uses one of these endpoints: - - - /v1/sources/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1409 - - /v1/destinations/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1995 - """ - _ = workspace_id # Not used (yet) - bearer_token: SecretString = get_bearer_token( +) -> dict[str, Any]: + config_api_root = get_config_api_root(api_root) + bearer_token = get_bearer_token( client_id=client_id, client_secret=client_secret, api_root=api_root, ) - path = config_api_root + f"/{connector_type}s/check_connection" headers: dict[str, Any] = { "Content-Type": "application/json", - "Accept": "application/json", "Authorization": f"Bearer {bearer_token}", + "User-Agent": "PyAirbyte Client", } - # Create a Request object - req = requests.Request( + response = requests.request( method="POST", - url=path, + url=config_api_root + path, headers=headers, - json={ - f"{connector_type}Id": actor_id, - }, + json=json, ) - # Prepare the request - prepared = req.prepare() - - # Inspect the prepared request (TODO: Remove extra debug prints before merging) - print(f"URL: {prepared.url}") - print(f"Headers: {prepared.headers}") - print(f"Body: {prepared.body}") - - # Send the prepared request using a Session - with requests.Session() as session: - response = session.send(prepared) - if not status_ok(response.status_code): try: response.raise_for_status() except requests.HTTPError as ex: raise AirbyteError( - # TODO: Remove extra debug prints before merging context={ - "path": path, "url": response.request.url, "body": response.request.body, - "headers": response.request.headers, "response": response.__dict__, }, ) from ex - response_json = response.json() - result, message = response_json.get("status"), response_json.get("message") + return response.json() + + +def check_connector( + *, + actor_id: str, + connector_type: Literal["source", "destination"], + client_id: SecretString, + client_secret: SecretString, + workspace_id: str | None = None, + api_root: str = CLOUD_API_ROOT, +) -> tuple[bool, str | None]: + """Check a source. + + Raises an exception if the check fails. Uses one of these endpoints: + + - /v1/sources/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1409 + - /v1/destinations/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1995 + """ + _ = workspace_id # Not used (yet) + + json_result = _make_config_api_request( + path=f"/{connector_type}s/check_connection", + json={ + f"{connector_type}Id": actor_id, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + result, message = json_result.get("status"), json_result.get("message") if result == "succeeded": return True, None @@ -857,7 +858,7 @@ def check_connector( context={ "actor_id": actor_id, "connector_type": connector_type, - "response": response, + "response": json_result, }, ) @@ -870,65 +871,12 @@ class DockerImageOverride: override_level: Literal["workspace", "actor"] = "actor" -def set_actor_override( - *, - workspace_id: str, - actor_id: str, - actor_type: Literal["source", "destination"], - override: DockerImageOverride, - config_api_root: str = CLOUD_CONFIG_API_ROOT, - client_id: SecretString, - client_secret: SecretString, -) -> None: - """Override the docker image and tag for a specific connector. - - https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5111 - """ - path = config_api_root + "/v1/scoped_configuration/create" - bearer_token: SecretString = get_bearer_token( - client_id=client_id, - client_secret=client_secret, - api_root=config_api_root, - ) - headers: dict[str, Any] = { - "Content-Type": "application/json", - "Authorization": SecretString(f"Bearer {bearer_token}"), - } - request_body: dict[str, str] = { - # https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7376 - "value": override.docker_image_override, - "config_key": "docker_image", # TODO: Fix this. - "scope_id": actor_id, - "scope_type": actor_type, - "resource_id": "", # TODO: Need to call something like get_actor_definition - "resource_type": "ACTOR_DEFINITION", - "origin": "", # TODO: Need to get user ID somehow or use another origin type - "origin_type": "USER", - } - response = requests.request( - method="POST", - url=path, - headers=headers, - json=request_body, - ) - if not status_ok(response.status_code): - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "actor_id": actor_id, - "actor_type": actor_type, - "response": response, - }, - ) - - def get_connector_image_override( *, workspace_id: str, actor_id: str, actor_type: Literal["source", "destination"], api_root: str = CLOUD_API_ROOT, - config_api_root: str = CLOUD_CONFIG_API_ROOT, client_id: SecretString, client_secret: SecretString, ) -> DockerImageOverride | None: @@ -940,53 +888,75 @@ def get_connector_image_override( https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5066 """ - path = config_api_root + "/scoped_configuration/list" - bearer_token = get_bearer_token( + json_result = _make_config_api_request( + path="/scoped_configuration/list", + json={ + "config_key": "docker_image", # TODO: Fix this. + }, + api_root=api_root, client_id=client_id, client_secret=client_secret, - api_root=api_root, - ) - headers: dict[str, Any] = { - "Content-Type": "application", - "Authorization": str(f"Bearer {bearer_token}"), - "Connection": None, - "User-Agent": None, - } - request_body: dict[str, str] = { - "config_key": "docker_image", # TODO: Fix this. - } - response = requests.request( - method="POST", - url=path, - headers=headers, - json=request_body, ) - if not status_ok(response.status_code): + try: + scoped_configurations = json_result["scopedConfigurations"] + except KeyError as ex: raise AirbyteError( + message="Could not find 'scoped_configurations' in response.", context={ "workspace_id": workspace_id, - "url": response.request.url, "actor_id": actor_id, "actor_type": actor_type, - "status_code": response.status_code, - "request": response.request.__dict__, - "response": response.__dict__, + "response_keys": list(json_result.keys()), }, - ) - if not response.json(): - return None - + ) from ex overrides = [ DockerImageOverride( docker_image_override=entry["value"], override_level=entry["scope_type"], ) - for entry in response.json() + for entry in scoped_configurations ] - if not overrides: + if len(overrides) == 0: return None + if len(overrides) > 1: raise NotImplementedError( "Multiple overrides found. This is not yet supported.", ) + return overrides[0] + + +def set_actor_override( + *, + workspace_id: str, + actor_id: str, + actor_type: Literal["source", "destination"], + override: DockerImageOverride, + api_root: str = CLOUD_API_ROOT, + client_id: SecretString, + client_secret: SecretString, +) -> None: + """Override the docker image and tag for a specific connector. + + https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L5111 + """ + _ = workspace_id + json_result = _make_config_api_request( + path="/v1/scoped_configuration/create", + json={ + # https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7376 + "value": override.docker_image_override, + "config_key": "docker_image", # TODO: Fix this. + "scope_id": actor_id, + "scope_type": actor_type, + "resource_id": "", # TODO: Need to call something like get_actor_definition + "resource_type": "ACTOR_DEFINITION", + "origin": "", # TODO: Need to get user ID somehow or use another origin type + "origin_type": "USER", + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + _ = json_result # Not used (yet) From 458e9fe7361723cfd0e0f7766b2b06c71d96e558 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 16 Dec 2024 13:55:48 -0800 Subject: [PATCH 13/15] add check action to cloud connectors --- airbyte/cloud/connectors.py | 62 +++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index ab3b1ecd..8236ebcf 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -6,11 +6,44 @@ import abc from typing import TYPE_CHECKING, ClassVar, Literal +from attr import dataclass + +from airbyte._util import api_util + if TYPE_CHECKING: from airbyte.cloud.workspaces import CloudWorkspace +@dataclass +class CheckResult: + """A cloud check result object.""" + + success: bool + """Whether the check result is valid.""" + + error_message: str | None = None + """None if the check was successful. Otherwise the failure message from the check result.""" + + internal_error: str | None = None + """None if the check was able to be run. Otherwise, this will describe the internal failure.""" + + def __bool__(self) -> bool: + """Truthy when check was successful.""" + return self.success + + def __str__(self) -> str: + """Get a string representation of the check result.""" + return "Success" if self.success else f"Failed: {self.error_message}" + + def __repr__(self) -> str: + """Get a string representation of the check result.""" + return ( + f"CloudCheckResult(success={self.success}, " + f"error_message={self.error_message or self.internal_error})" + ) + + class CloudConnector(abc.ABC): """A cloud connector is a deployed source or destination on Airbyte Cloud. @@ -43,6 +76,35 @@ def permanently_delete(self) -> None: else: self.workspace.permanently_delete_destination(self.connector_id) + def check( + self, + *, + raise_on_error: bool = True, + ) -> CheckResult: + """Check the connector. + + Returns: + A `CheckResult` object containing the result. The object is truthy if the check was + successful and falsy otherwise. The error message is available in the `error_message` + or by converting the object to a string. + """ + result = api_util.check_connector( + workspace_id=self.workspace.workspace_id, + connector_type=self.connector_type, + actor_id=self.connector_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + check_result = CheckResult( + success=result[0], + error_message=result[1], + ) + if raise_on_error and not check_result: + raise ValueError(f"Check failed: {check_result}") + + return check_result + class CloudSource(CloudConnector): """A cloud source is a source that is deployed on Airbyte Cloud.""" From 4002a7337227839dd92649033cfea69c2bb2493f Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 16 Dec 2024 13:55:52 -0800 Subject: [PATCH 14/15] update tests --- tests/integration_tests/cloud/test_cloud_api_util.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index 110d9eca..7dec08e0 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -15,7 +15,6 @@ check_connector, AirbyteError, CLOUD_API_ROOT, - CLOUD_CONFIG_API_ROOT, get_connector_image_override, ) from airbyte_api.models import DestinationDuckdb, SourceFaker @@ -264,7 +263,7 @@ def test_get_bearer_token( "connector_id, connector_type, expect_success", [ ("f45dd701-d1f0-4e8e-97c4-2b89c40ac928", "source", True), - # ("......-....-....-....-............", "destination", True), + # ("......-....-....-............", "destination", True), ], ) def test_check_connector( @@ -281,7 +280,7 @@ def test_check_connector( client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, ) - assert result is not None + assert result == expect_success except NotImplementedError: pytest.fail("check_connector function is not implemented") except AirbyteError as e: @@ -310,7 +309,7 @@ def test_get_connector_overrides( client_id=airbyte_cloud_client_id, client_secret=airbyte_cloud_client_secret, ) - assert result is not None + # assert result is not None # No overrides on this item except NotImplementedError: pytest.fail("check_connector function is not implemented") except AirbyteError as e: From a59d13f2b8e6994d8ebd1c9ccadd7f310538de7b Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Mon, 16 Dec 2024 16:55:16 -0800 Subject: [PATCH 15/15] Update airbyte/_util/api_util.py --- airbyte/_util/api_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 1380eb1e..71d47bbd 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -941,7 +941,7 @@ def set_actor_override( """ _ = workspace_id json_result = _make_config_api_request( - path="/v1/scoped_configuration/create", + path="/scoped_configuration/create", json={ # https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L7376 "value": override.docker_image_override,