Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Consider flush_all_streams for schema updates #247

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion target_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,19 @@ def persist_lines(config, lines, table_cache=None, file_format_type: FileFormatT
# if same stream has been encountered again, it means the schema might have been altered
# so previous records need to be flushed
if row_count.get(stream, 0) > 0:
# flush all streams, delete records if needed, reset counts and then emit current state
if config.get('flush_all_streams'):
filter_streams = None
else:
filter_streams = [stream]
flushed_state = flush_streams(records_to_load,
row_count,
stream_to_sync,
config,
state,
flushed_state,
archive_load_files_data)
archive_load_files_data,
filter_streams=filter_streams)

# emit latest encountered state
emit_state(flushed_state)
Expand Down