Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⏱️ [Do not merge]: Release experimental interop methods to stable #192

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def get_sync_result(

# Deletions

def _permanently_delete(
def permanently_delete(
self,
*,
delete_source: bool = False,
Expand All @@ -199,16 +199,16 @@ def _permanently_delete(
delete_source: Whether to also delete the source.
delete_destination: Whether to also delete the destination.
"""
self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now)
self.workspace.permanently_delete_connection( # Non-public API (for now)
connection=self
)

if delete_source:
self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now)
self.workspace.permanently_delete_source( # Non-public API (for now)
source=self.source_id
)

if delete_destination:
self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API
destination=self.destination_id,
self.workspace.permanently_delete_destination( # Non-public API
destination=self.destination_id
)
21 changes: 3 additions & 18 deletions airbyte/cloud/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,6 @@
stacklevel=2,
)


class CloudWorkspace(Stable_CloudWorkspace):
__doc__ = (
f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}"
)
deploy_connection = Stable_CloudWorkspace._deploy_connection
deploy_source = Stable_CloudWorkspace._deploy_source
deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination
permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection
permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source
permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination


class CloudConnection(Stable_CloudConnection):
__doc__ = (
f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}"
)
permanently_delete = Stable_CloudConnection._permanently_delete
# All experimental methods are now stable.
CloudConnection = Stable_CloudConnection
CloudWorkspace = Stable_CloudWorkspace
32 changes: 25 additions & 7 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,31 @@ def connect(self) -> None:

# Deploy and delete sources

# TODO: Make this a public API
def _deploy_source(
def deploy_source(
self,
name_key: str | None = None,
source: Source,
*,
source_id: str | None = None,
update_existing: bool = True,
) -> str:
"""Deploy a source to the workspace.

This method will deploy a source to the workspace and return the source ID. It can also
be used to update existing sources, replacing their definitions with the provided source
configuration.

Args:
name_key (str): The key to use for the source name. This is used to provide
idempotency when deploying the same source multiple times. If `None`, then
`source_id` is required. If a matching source source is found and `update_existing`
is `False`, then a `AirbyteResourceAlreadyExists` exception will be raised.
source (Source): The source to deploy.
source_id (str, optional): The ID of an existing source to replace/update. If provided,
then `name_key` and `replace` will be ignored.
update_existing (bool, optional): If `True`, the source will be updated if it already
exists. Ignored if `source_id` is provided.

Returns the newly deployed source ID.
"""
source_configuration = source.get_config().copy()
Expand All @@ -93,7 +111,7 @@ def _deploy_source(

return deployed_source.source_id

def _permanently_delete_source(
def permanently_delete_source(
self,
source: str | Source,
) -> None:
Expand Down Expand Up @@ -147,7 +165,7 @@ def _deploy_cache_as_destination(

return deployed_destination.destination_id

def _permanently_delete_destination(
def permanently_delete_destination(
self,
*,
destination: str | None = None,
Expand Down Expand Up @@ -277,7 +295,7 @@ def get_connection(
connection_id=connection_id,
)

def _permanently_delete_connection(
def permanently_delete_connection(
self,
connection: str | CloudConnection,
*,
Expand All @@ -301,10 +319,10 @@ def _permanently_delete_connection(
workspace_id=self.workspace_id,
)
if delete_source:
self._permanently_delete_source(source=connection.source_id)
self.permanently_delete_source(source=connection.source_id)

if delete_destination:
self._permanently_delete_destination(destination=connection.destination_id)
self.permanently_delete_destination(destination=connection.destination_id)

# Run syncs

Expand Down
6 changes: 3 additions & 3 deletions tests/integration_tests/cloud/test_cloud_sql_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ def test_deploy_and_run_and_read(

# Cleanup
with suppress(Exception):
cloud_workspace._permanently_delete_connection(
cloud_workspace.permanently_delete_connection(
connection_id=connection_id,
delete_source=True,
delete_destination=True,
)
with suppress(Exception):
cloud_workspace._permanently_delete_source(source_id=source_id)
cloud_workspace.permanently_delete_source(source_id=source_id)
with suppress(Exception):
cloud_workspace._permanently_delete_destination(destination_id=destination_id)
cloud_workspace.permanently_delete_destination(destination_id=destination_id)


@pytest.mark.parametrize(
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/cloud/test_cloud_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ def test_deploy_and_run_connection(
assert cache.stream_names
assert cache.streams["users"].to_pandas()

cloud_workspace._permanently_delete_connection(connection_id=connection_id)
cloud_workspace.permanently_delete_connection(connection_id=connection_id)
58 changes: 51 additions & 7 deletions tests/integration_tests/cloud/test_cloud_workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
"""
from __future__ import annotations

from pytest import raises

import airbyte as ab
from airbyte import cloud
from airbyte.caches import MotherDuckCache
from airbyte.cloud import CloudWorkspace
from airbyte.cloud.connections import CloudConnection
Expand All @@ -15,16 +18,57 @@ def test_deploy_source(
cloud_workspace: CloudWorkspace,
) -> None:
"""Test deploying a source to a workspace."""
source = ab.get_source(
local_source: ab.Source = ab.get_source(
"source-faker",
local_executable="source-faker",
config={"count": 100},
config={"count": 100, "seed": 123},
install_if_missing=False,
)
source.check()
source_id: str = cloud_workspace._deploy_source(source)
local_source.check()

# Deploy source:
source_connector: cloud.CloudConnector = cloud_workspace.deploy_source(
local_source,
name_key="My Faker Source", # Used in deduplication and idempotency
update_existing=False, # Fail if source already exists
)
assert source_connector.source_name == "My Faker Source"
assert source_connector.configuration["count"] == 100

with raises(PyAirbyteResourceConflictError):
# Deploy source again (should fail):
cloud_workspace.deploy_source(
source=local_source,
name_key="My Faker Source", # Used in deduplication and idempotency
update_existing=False, # Fail since source already exists
)

# Change config and deploy source again (should succeed):
local_source.set_config({"count": 200})
source_connector: cloud.CloudConnector = cloud_workspace.deploy_source(
source=local_source,
name_key="My Faker Source", # Used in deduplication and idempotency
update_existing=True, # Update existing source
)

# Partially update the configuration:
source_connector.update_configuration(
{"count": 300},
merge=True,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aaronsteers How do you envision merge being different from update_existing?
In my mind, those sound like the same thing. Can we use update_existing in both places?

Copy link
Contributor Author

@aaronsteers aaronsteers Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @alex-gron. The "update_existing" was/is planned for a "deploy*()" verb - where a False value would fail if the resource already exists - rather than update it.

In this context "merge" is in the context of changes to settings - where 'merge=true' means we want to merge the provided setting with existing settign, and 'merge=false' would mean we replace all non-provided keys with the provided ones. This is useful when you want to change only one setting without modifying others. Especially useful when we don't want to re-supply creds each time we update non-secret config values.

)
assert source_connector.configuration["count"] == 300
assert source_connector.configuration["seed"] == 123

# Fully replace the configuration:
source_connector.update_configuration(
{"count": 300},
merge=False,
)
assert source_connector.configuration["count"] == 300
assert "seed" not in source_connector.configuration

cloud_workspace._permanently_delete_source(source=source_id)
# Delete the deployed source connector:
source_connector.permanently_delete_connector()


def test_deploy_cache_as_destination(
Expand All @@ -38,7 +82,7 @@ def test_deploy_cache_as_destination(
schema_name="public",
)
destination_id: str = cloud_workspace._deploy_cache_as_destination(cache=cache)
cloud_workspace._permanently_delete_destination(destination=destination_id)
cloud_workspace.permanently_delete_destination(destination=destination_id)


def test_deploy_connection(
Expand Down Expand Up @@ -69,7 +113,7 @@ def test_deploy_connection(
assert set(connection.stream_names) == set(["users", "products", "purchases"])
assert connection.table_prefix == "abc_deleteme_"
# assert connection.table_suffix == "" # Suffix not supported in CloudConnection
cloud_workspace._permanently_delete_connection(
cloud_workspace.permanently_delete_connection(
connection=connection,
delete_source=True,
delete_destination=True,
Expand Down
Loading