Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support additional connection methods #323

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ab4b269
Change RedshiftConnectionManager to extend from SQLConnectionManager,…
sathiish-kumar Dec 19, 2022
ff9fdfd
Add/fix unit tests, create RedshiftConnectMethodFactory to vend conne…
sathiish-kumar Dec 30, 2022
fbd5731
Fix _connection_keys to mimic PostgresConnectionManager
sathiish-kumar Jan 3, 2023
4f98546
Remove unneeded functions for tmp_cluster_creds and env_var creds aut…
sathiish-kumar Jan 9, 2023
f724708
Merge branch 'main' into migrate_psycopg2_to_rshift_connector
sathiish-kumar Jan 18, 2023
5319b90
Resolve some TODOs
sathiish-kumar Jan 18, 2023
16666db
Fix references to old exceptions, add changelog
sathiish-kumar Jan 18, 2023
30ae0b5
Fix errors with functional tests by overriding add_query & execute an…
sathiish-kumar Jan 23, 2023
de7c411
Merge branch 'dbt-labs:main' into migrate_psycopg2_to_rshift_connector
sathiish-kumar Jan 24, 2023
bfe8678
Attempt to fix integration tests by adding `valid_incremental_strateg…
sathiish-kumar Jan 24, 2023
c8a18d8
Fix unit tests
sathiish-kumar Jan 24, 2023
40e0fe5
Attempt to fix integration tests
sathiish-kumar Jan 25, 2023
c74d3ee
Merge branch 'main' into migrate_psycopg2_to_rshift_connector
sathiish-kumar Jan 26, 2023
66c1594
add unit tests for execute
jiezhen-chen Jan 26, 2023
54bc39f
Merge branch 'main' into migrate_psycopg2_to_rshift_connector
sathiish-kumar Jan 27, 2023
3ed9876
Merge branch 'main' into migrate_psycopg2_to_rshift_connector
sathiish-kumar Jan 27, 2023
4bb97ab
Merge branch 'main' into migrate_psycopg2_to_rshift_connector
sathiish-kumar Jan 30, 2023
cfad7ff
add unit tests for add_query
jiezhen-chen Jan 30, 2023
b1c8e00
Merge branch 'main' into migrate_psycopg2_to_rshift_connector
colin-rogers-dbt Jan 30, 2023
12eb89b
make get_connection_method work with serverless
jiezhen-chen Jan 31, 2023
9a319ac
Merge branch 'main' into migrate_psycopg2_to_rshift_connector
colin-rogers-dbt Jan 31, 2023
d3113ca
add unit tests for serverless iam connections
jiezhen-chen Jan 31, 2023
ccfebc8
support auth_profile
jiezhen-chen Feb 2, 2023
1dc3ebe
add unit tests for auth_profile connection method
jiezhen-chen Feb 2, 2023
b43d20c
add support for auth_profile
jiezhen-chen Feb 3, 2023
93947e1
add support for idp credentials
jiezhen-chen Feb 6, 2023
fb60268
add CHANGELOG
jiezhen-chen Feb 6, 2023
c710803
change host default to None
jiezhen-chen Feb 6, 2023
17fcb03
add unit tests and improve error messages
jiezhen-chen Feb 7, 2023
daaf05b
Merge branch 'main' into support_additional_connection_methods
jiezhen-chen Feb 16, 2023
0947784
Merge branch 'main' into support_additional_connection_methods
sathiish-kumar Feb 16, 2023
b1b09ce
remove application_name and add okta prefix
jiezhen-chen Feb 22, 2023
971c957
Merge branch 'main' into support_additional_connection_methods
jiezhen-chen Feb 22, 2023
674d754
add azure_ and okta_ prefix, make application_name unconfigurable to …
jiezhen-chen Feb 22, 2023
5856fe2
Merge branch 'main' into support_additional_connection_methods
sathiish-kumar Feb 23, 2023
2047bbf
Merge branch 'main' into support_additional_connection_methods
colin-rogers-dbt Feb 27, 2023
4622210
Merge branch 'main' into support_additional_connection_methods
colin-rogers-dbt Mar 8, 2023
857294a
Merge branch 'main' into support_additional_connection_methods
jiezhen-chen Mar 21, 2023
c901bb7
error message enhancement
jiezhen-chen Mar 21, 2023
0d89af6
error handling for region mismatch, make credentials providers case i…
jiezhen-chen Mar 21, 2023
b66b58d
Merge branch 'main' into support_additional_connection_methods
jiezhen-chen Apr 17, 2023
6eb89e4
remove unused code
jiezhen-chen Apr 17, 2023
de7e0e7
add pre-commit hook changes
jiezhen-chen Apr 17, 2023
0b60812
Merge branch 'main' into support_additional_connection_methods
jiezhen-chen May 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/unreleased/Features-20230206-112433.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: Features
time: 2023-02-06T11:24:33.926088-08:00
custom:
Author: jiezhen-chen
Issue: "6232"
145 changes: 138 additions & 7 deletions dbt/adapters/redshift/connections.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import re
from multiprocessing import Lock
from contextlib import contextmanager
Expand Down Expand Up @@ -39,13 +40,16 @@ def json_schema(self):
class RedshiftConnectionMethod(StrEnum):
DATABASE = "database"
IAM = "iam"
AUTH_PROFILE = "auth_profile"
IDP = "IdP"


@dataclass
class RedshiftCredentials(Credentials):
host: str
user: str
port: Port
region: Optional[str] = None
colin-rogers-dbt marked this conversation as resolved.
Show resolved Hide resolved
host: Optional[str] = None
user: Optional[str] = None
method: str = RedshiftConnectionMethod.DATABASE # type: ignore
password: Optional[str] = None # type: ignore
cluster_id: Optional[str] = field(
Expand All @@ -59,7 +63,19 @@ class RedshiftCredentials(Credentials):
connect_timeout: int = 30
role: Optional[str] = None
sslmode: Optional[str] = None
application_name: Optional[str] = "dbt"
colin-rogers-dbt marked this conversation as resolved.
Show resolved Hide resolved
retries: int = 1
auth_profile: Optional[str] = None
# Azure identity provider plugin
credentials_provider: Optional[str] = None
idp_tenant: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
preferred_role: Optional[str] = None
# Okta identity provider plugin
idp_host: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if these are okta specific can we add a prefix like okta_* to indicate that?

app_id: Optional[str] = None
app_name: Optional[str] = None

_ALIASES = {"dbname": "database", "pass": "password"}

Expand All @@ -68,11 +84,20 @@ def type(self):
return "redshift"

def _connection_keys(self):
return "host", "port", "user", "database", "schema", "method", "cluster_id", "iam_profile"
return (
"host",
"port",
"user",
"database",
"schema",
"method",
"cluster_id",
"iam_profile",
)

@property
def unique_field(self) -> str:
return self.host
return self.host if self.host else self.database


class RedshiftConnectMethodFactory:
Expand All @@ -84,14 +109,18 @@ def __init__(self, credentials):
def get_connect_method(self):
method = self.credentials.method
kwargs = {
"host": self.credentials.host,
"host": "",
"region": self.credentials.region,
"database": self.credentials.database,
"port": self.credentials.port if self.credentials.port else 5439,
"auto_create": self.credentials.autocreate,
"db_groups": self.credentials.db_groups,
"region": self.credentials.host.split(".")[2],
"timeout": self.credentials.connect_timeout,
"application_name": self.credentials.application_name,
}
if method == RedshiftConnectionMethod.IAM or method == RedshiftConnectionMethod.DATABASE:
kwargs["host"] = self.credentials.host
kwargs["region"] = self.credentials.host.split(".")[2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the user passes a region that's different from the region in the host should we raise an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, we could also default to using region from host, in the scenario that host is provided. So if provided region and region in host are different we use the region from host. But we could also throw an error. What are your thoughts?

if self.credentials.sslmode:
kwargs["sslmode"] = self.credentials.sslmode

Expand All @@ -107,14 +136,115 @@ def get_connect_method(self):
def connect():
logger.debug("Connecting to redshift with username/password based auth...")
c = redshift_connector.connect(
user=self.credentials.user, password=self.credentials.password, **kwargs
user=self.credentials.user,
password=self.credentials.password,
**kwargs,
)
if self.credentials.role:
c.cursor().execute("set role {}".format(self.credentials.role))
return c

return connect

elif method == RedshiftConnectionMethod.AUTH_PROFILE:
if not self.credentials.auth_profile:
raise dbt.exceptions.FailedToConnectError(
"Failed to use auth profile method. 'auth_profile' must be provided."
)
if not self.credentials.region:
raise dbt.exceptions.FailedToConnectError(
"Failed to use auth profile method. 'region' must be provided."
)

def connect():
logger.debug("Connecting to redshift with authentication profile...")
c = redshift_connector.connect(
iam=True,
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
session_token=os.environ["AWS_SESSION_TOKEN"],
db_user=self.credentials.user,
auth_profile=self.credentials.auth_profile,
**kwargs,
)
if self.credentials.role:
c.cursor().execute("set role {}".format(self.credentials.role))
return c

return connect
elif method == RedshiftConnectionMethod.IDP:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we done any local testing of these new connection methods? Ideally we would be able to cover these with integration tests but that may be unrealistic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've done local testing for these new connection methods. What I've done for local testing - Got some credentials from someone who manages redshift_connector and edited the profiles.yml to connect to Redshift.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how feasible integration tests are - since we got these credentials from another team and we don't own these credentials. Ideally we should spend some time to create these credentials ourselves so that the integration tests don't depend on another team - but that may be out of scope for right now.

if not self.credentials.credentials_provider:
raise dbt.exceptions.FailedToConnectError(
"Failed to use IdP credentials. 'credentials_provider' must be provided."
)

if not self.credentials.region:
raise dbt.exceptions.FailedToConnectError(
"Failed to use IdP credentials. 'region' must be provided."
)

if not self.credentials.password or not self.credentials.user:
raise dbt.exceptions.FailedToConnectError(
"Failed to use IdP credentials. 'password' and 'user' must be provided."
)

if self.credentials.credentials_provider == "AzureCredentialsProvider":
if (
not self.credentials.idp_tenant
or not self.credentials.client_id
or not self.credentials.client_secret
or not self.credentials.preferred_role
):
raise dbt.exceptions.FailedToConnectError(
"Failed to use Azure credential. 'idp_tenant', 'client_id', 'client_secret', "
"and 'preferred_role' must be provided"
)

def connect():
logger.debug("Connecting to redshift with Azure Credentials Provider...")
c = redshift_connector.connect(
iam=True,
region=self.credentials.region,
database=self.credentials.database,
cluster_identifier=self.credentials.cluster_id,
credentials_provider="AzureCredentialsProvider",
user=self.credentials.user,
password=self.credentials.password,
idp_tenant=self.credentials.idp_tenant,
client_id=self.credentials.client_id,
client_secret=self.credentials.client_secret,
preferred_role=self.credentials.preferred_role,
)
return c

return connect
elif self.credentials.credentials_provider == "OktaCredentialsProvider":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above about testing for IDP

if (
not self.credentials.idp_host
or not self.credentials.app_id
or not self.credentials.app_name
):
raise dbt.exceptions.FailedToConnectError(
"Failed to use Okta credential. 'idp_host', 'app_id', 'app_name' must be provided."
)

def connect():
logger.debug("Connecting to redshift with Okta Credentials Provider...")
c = redshift_connector.connect(
iam=True,
region=self.credentials.region,
database=self.credentials.database,
cluster_identifier=self.credentials.cluster_id,
credentials_provider="OktaCredentialsProvider",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does this need to be hardcoded? Looks like we already check that self.credentials.credentials_provider == "OktaCredentialsProvider" above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Might also be a good idea to allow more wiggle room for the customer to use these credentials. I think we can hardcode credentials_provider="OktaCredentialsProvider" in the connection call, and allow for multiple variations of Okta from the customer(both "okta" and "oktacredentialprovider" should be accepted with case insensitivity), instead of requiring customers to input excatly "OktaCredentialsProvider"

user=self.credentials.user,
password=self.credentials.password,
idp_host=self.credentials.idp_host,
app_id=self.credentials.app_id,
app_name=self.credentials.app_name,
)
return c

return connect
elif method == RedshiftConnectionMethod.IAM:
if not self.credentials.cluster_id and "serverless" not in self.credentials.host:
raise dbt.exceptions.FailedToConnectError(
Expand All @@ -138,6 +268,7 @@ def connect():
return c

return connect

else:
raise dbt.exceptions.FailedToConnectError(
"Invalid 'method' in profile: '{}'".format(method)
Expand Down
Loading