Skip to content

Commit

Permalink
Try using copy command from psycopg3
Browse files Browse the repository at this point in the history
  • Loading branch information
SpaceCondor committed Sep 27, 2024
1 parent 1a60b45 commit 4eb7987
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 322 deletions.
622 changes: 326 additions & 296 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ packages = [
[tool.poetry.dependencies]
python = ">=3.8"
faker = {version = "~=29.0", optional = true}
psycopg2-binary = "2.9.9"
psycopg-binary = "3.2.2"
sqlalchemy = "~=2.0"
sshtunnel = "0.4.0"

Expand Down
55 changes: 37 additions & 18 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,25 @@ def generate_temp_table_name(self):
# in postgres, used a guid just in case we are using the same session
return f"{str(uuid.uuid4()).replace('-', '_')}"

def generate_copy_statement(
self,
full_table_name: str | FullyQualifiedName,
columns: list[sa.Column], # type: ignore[override]
) -> str:
"""Generate a copy statement for bulk copy.
Args:
full_table_name: the target table name.
columns: the target table columns.
Returns:
A copy statement.
"""
columns_list = ", ".join(f'"{column.name}"' for column in columns)
sql: str = f'COPY "{full_table_name}" ({columns_list}) FROM STDIN'

return sql

def bulk_insert_records( # type: ignore[override]
self,
table: sa.Table,
Expand All @@ -145,35 +164,35 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
# Only one record per PK, we want to take the last one
data_to_insert: list[dict[str, t.Any]] = []
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)


data_to_copy: list[dict[str, t.Any]] = []

# If append only is False, we only take the latest record one per primary key
if self.append_only is False:
insert_records: dict[tuple, dict] = {} # pk tuple: record
unique_copy_records: dict[tuple, tuple] = {} # pk tuple: values
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
values = tuple(record.get(column.name) for column in columns)
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
insert_records[primary_key_tuple] = insert_record
data_to_insert = list(insert_records.values())
unique_copy_records[primary_key_tuple] = values
data_to_copy = list(unique_copy_records.values())
else:
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
data_to_copy.append(insert_record)

# Use copy_expert to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
for row in data_to_copy:
copy.write_row(data_to_copy)

return True

def upsert(
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(
th.Property(
"dialect+driver",
th.StringType,
default="postgresql+psycopg2",
default="postgresql+psycopg",
description=(
"Dialect+driver see "
+ "https://docs.sqlalchemy.org/en/20/core/engines.html. "
Expand Down
6 changes: 3 additions & 3 deletions target_postgres/tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def postgres_config():
return {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -29,7 +29,7 @@ def postgres_config():

def postgres_config_no_ssl():
return {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -43,7 +43,7 @@ def postgres_config_no_ssl():

def postgres_config_ssh_tunnel():
return {
"sqlalchemy_url": "postgresql://postgres:[email protected]:5432/main",
"sqlalchemy_url": "postgresql+psycopg://postgres:[email protected]:5432/main",
"ssh_tunnel": {
"enable": True,
"host": "127.0.0.1",
Expand Down
4 changes: 2 additions & 2 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def test_sqlalchemy_url_config(postgres_config_no_ssl):
def test_port_default_config():
"""Test that the default config is passed into the engine when the config doesn't provide it"""
config = {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand All @@ -191,7 +191,7 @@ def test_port_default_config():
def test_port_config():
"""Test that the port config works"""
config = {
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"user": "postgres",
"password": "postgres",
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def connector():
"""Create a PostgresConnector instance."""
return PostgresConnector(
config={
"dialect+driver": "postgresql+psycopg2",
"dialect+driver": "postgresql+psycopg",
"host": "localhost",
"port": "5432",
"user": "postgres",
Expand Down

0 comments on commit 4eb7987

Please sign in to comment.