Skip to content

Commit

Permalink
Update parameter binding
Browse files Browse the repository at this point in the history
  • Loading branch information
SpaceCondor committed Sep 30, 2024
1 parent 67d1197 commit 14c4581
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,21 @@ def bulk_insert_records( # type: ignore[override]
"""
columns = self.column_representation(schema)
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.error("Inserting with SQL: %s", copy_statement)

self.logger.info("Inserting with SQL: %s", copy_statement)

data_to_copy: list[dict[str, t.Any]] = []

# If append only is False, we only take the latest record one per primary key
if self.append_only is False:
unique_copy_records: dict[tuple, tuple] = {} # pk tuple: values
unique_copy_records: dict[tuple, dict] = {} # pk tuple: values
for record in records:
values = tuple(record.get(column.name) for column in columns)
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
unique_copy_records[primary_key_tuple] = values
unique_copy_records[primary_key_tuple] = insert_record
data_to_copy = list(unique_copy_records.values())
else:
for record in records:
Expand All @@ -187,11 +188,26 @@ def bulk_insert_records( # type: ignore[override]
}
data_to_copy.append(insert_record)

# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_bind_processors = {
column.name: column.type.bind_processor(connection.dialect) for column in columns
}


# Use copy_expert to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
for row in data_to_copy:
copy.write_row(row)
processed_row = []
for row_column_name in row:
if column_bind_processors[row_column_name] is not None:
processed_row.append(column_bind_processors[row_column_name](row[row_column_name]))
else:
processed_row.append(row[row_column_name])

copy.write_row(processed_row)

return True

Expand Down

0 comments on commit 14c4581

Please sign in to comment.