Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bci-flolas committed Jul 13, 2021
1 parent eb03af0 commit 272b5aa
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
17 changes: 11 additions & 6 deletions airflow/providers/teradata/hooks/ttu.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def execute_bteq(self, bteq, xcom_push_flag=False):
if xcom_push_flag:
return line

def execute_tdload(self, input_file, table, delimiter=';', working_database=None, encoding='UTF8', xcom_push_flag=False, raise_on_rows_error=False, raise_on_rows_duplicated=False):
def execute_tdload(self, input_file, table, delimiter=';', working_database=None, encoding='UTF8', xcom_push_flag=False, raise_on_rows_error=False, raise_on_rows_duplicated=False, debug=False, restart_limit=0):
"""
Load a CSV file to Teradata Table (previously created) using tdload binary.
Note: You need to strip header of the CSV. tdload only accepts rows, not header.
Expand Down Expand Up @@ -161,7 +161,9 @@ def execute_tdload(self, input_file, table, delimiter=';', working_database=None
fload_out_path,
fload_checkpoint_path,
conn['ttu_max_sessions'],
working_database
working_database,
debug,
restart_limit
),
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,preexec_fn=os.setsid)
line = ''
Expand Down Expand Up @@ -229,7 +231,7 @@ def execute_tptexport(self, sql, output_file, delimiter = ';', encoding='UTF8',
self.log.info("""Exporting SQL '""" + sql + """' to file """ + output_file + """ using TPT Export""")

with TemporaryDirectory(prefix='airflowtmp_ttu_tpt') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=str(uuid.uuid4()), mode='wb') as f:
with NamedTemporaryFile(dir=tmp_dir, prefix=uuid.uuid4().hex, mode='wb') as f:
f.write(bytes(self._prepare_tpt_export_script(
sql,
output_file,
Expand All @@ -247,7 +249,7 @@ def execute_tptexport(self, sql, output_file, delimiter = ';', encoding='UTF8',
"location :{0}".format(fname))
f.seek(0)
conn['sp'] = subprocess.Popen(
['tbuild', '-f', fname, 'airflow' + '_tpt_' + str(uuid.uuid4())],
['tbuild', '-f', fname, 'airflow' + '_tpt_' + uuid.uuid4().hex],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
preexec_fn=os.setsid)

Expand Down Expand Up @@ -297,7 +299,7 @@ def _prepare_bteq_script(bteq_string, host, login, password, bteq_output_width,
return "\n".join(bteq_list)

@staticmethod
def _prepare_tdload_command(input_file, host, login, password, encoding, table, delimiter, log_path, checkpoint_path, max_sessions, working_database, job_name= 'airflow_tdload') -> str:
def _prepare_tdload_command(input_file, host, login, password, encoding, table, delimiter, log_path, checkpoint_path, max_sessions, working_database, debug, restart_limit, job_name= 'airflow_tdload') -> str:
"""
Prepare a tdload file with connection parameters for loading data from file
:param input_file : bteq sentences to execute
Expand All @@ -323,10 +325,13 @@ def _prepare_tdload_command(input_file, host, login, password, encoding, table,
tdload_command += ['-d'] + [delimiter]
tdload_command += ['-L'] + [log_path]
tdload_command += ['-r'] + [checkpoint_path]
tdload_command += ['-R'] + [str(restart_limit)]
tdload_command += ['--TargetMaxSessions'] + [str(max_sessions)]
if working_database:
tdload_command += ['--TargetWorkingDatabase'] + [working_database]
tdload_command += [ "%s_%s" % (job_name, uuid.uuid1()) ] #Job Name
if debug:
tdload_command += ['-x']
tdload_command += [ "%s_%s" % (job_name, uuid.uuid4().hex) ] #Job Name
return tdload_command

@staticmethod
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/teradata/operators/fastload.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def execute(self, context):
encoding=self.encoding,
xcom_push_flag=self.xcom_push,
raise_on_rows_error=self.raise_on_rows_error,
raise_on_rows_duplicated=self.raise_on_rows_duplicated)
raise_on_rows_duplicated=self.raise_on_rows_duplicated,
debug=self.debug
)
def on_kill(self):
self._hook.on_kill()
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.6
current_version = 1.0.7
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@
packages=find_namespace_packages(include=['airflow.providers.teradata', 'airflow.providers.teradata.*']),
setup_requires=['setuptools', 'wheel'],
url='https://github.com/flolas/apache_airflow_providers_teradata',
version='1.0.6',
version='1.0.7',
zip_safe=False,
)

0 comments on commit 272b5aa

Please sign in to comment.