Skip to content

Commit

Permalink
Merge pull request #94 from shimpeko/add_skip_work_option
Browse files Browse the repository at this point in the history
Add skip-work option to streaming_load job class
  • Loading branch information
aamine authored Feb 14, 2017
2 parents 8dc1dea + e3eed28 commit e0894d0
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions jobclass/streaming_load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def StreamingLoadJobClass.parameters(params)
params.add Bricolage::OptionalBoolParam.new('noop', 'Does not change any data.')
params.add Bricolage::OptionalBoolParam.new('load-only', 'Just issues COPY statement to work table and quit. No INSERT, no dequeue, no load log.')
params.add Bricolage::OptionalBoolParam.new('dequeue-only', 'Dequeues already loaded files.')
params.add Bricolage::OptionalBoolParam.new('skip-work', 'Skip work table and load directly into dest-table')
end

def StreamingLoadJobClass.declarations(params)
Expand Down Expand Up @@ -71,7 +72,8 @@ def make_loader(params)
sql: params['sql-file'],
logger: ds.logger,
noop: params['noop'],
load_only: params['load-only']
load_only: params['load-only'],
skip_work: params['skip-work']
)
end

Expand All @@ -95,7 +97,7 @@ class RedshiftStreamingLoader
def initialize(data_source:, queue:, keep_ctl:,
table:, work_table: nil, log_table: nil, load_options: nil,
sql: nil,
logger:, noop: false, load_only: false)
logger:, noop: false, load_only: false, skip_work: false)
@ds = data_source
@src = queue
@keep_ctl = keep_ctl
Expand All @@ -107,6 +109,7 @@ def initialize(data_source:, queue:, keep_ctl:,
@logger = logger
@noop = noop
@load_only = load_only
@skip_work = skip_work

@start_time = Time.now
@end_time = nil
Expand Down Expand Up @@ -159,7 +162,7 @@ def load
end
create_load_log_file(objects) {|log_url|
@ds.open {|conn|
execute_update conn, "truncate #{work_table}"
execute_update conn, "truncate #{work_table}" unless @skip_work
conn.transaction {|txn|
execute_update conn, "delete #{log_table_wk}"
execute_update conn, load_log_copy_stmt(log_table_wk, log_url, @src.credential_string)
Expand All @@ -174,10 +177,11 @@ def load
txn.truncate_and_commit log_table_wk
else
create_manifest_file(not_loaded) {|manifest_url|
execute_update conn, manifest_copy_stmt(work_table, manifest_url)
dest_table = @skip_work ? @table : work_table
execute_update conn, manifest_copy_stmt(dest_table, manifest_url)
@logger.info "load succeeded: #{manifest_url}" unless @noop
unless @load_only
commit_work_table conn, work_table
commit_work_table conn, work_table unless @skip_work
commit_load_log conn, log_table_wk
end
txn.truncate_and_commit log_table_wk
Expand Down

0 comments on commit e0894d0

Please sign in to comment.