From 2984be71efc3e0f74efc74fbd53d1ba5bd6d95cd Mon Sep 17 00:00:00 2001 From: Namkyu Kim Date: Sun, 24 Nov 2024 13:08:46 +0900 Subject: [PATCH 1/3] Add fail_on_file_not_exist option to SFTPToS3Operator --- .../amazon/aws/transfers/sftp_to_s3.py | 13 +++++++ .../amazon/aws/transfers/test_sftp_to_s3.py | 36 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py index 322110dadba5d..6e3ac225d6c74 100644 --- a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py +++ b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py @@ -49,6 +49,8 @@ class SFTPToS3Operator(BaseOperator): uploading the file to S3. :param use_temp_file: If True, copies file first to local, if False streams file from SFTP to S3. + :param fail_on_file_not_exist: If True, operator fails when file does not exist, + if False, operator will not fail and skips transfer. Default is True. """ template_fields: Sequence[str] = ("s3_key", "sftp_path", "s3_bucket") @@ -62,6 +64,7 @@ def __init__( sftp_conn_id: str = "ssh_default", s3_conn_id: str = "aws_default", use_temp_file: bool = True, + fail_on_file_not_exist: bool = True, **kwargs, ) -> None: super().__init__(**kwargs) @@ -71,6 +74,7 @@ def __init__( self.s3_key = s3_key self.s3_conn_id = s3_conn_id self.use_temp_file = use_temp_file + self.fail_on_file_not_exist = fail_on_file_not_exist @staticmethod def get_s3_key(s3_key: str) -> str: @@ -85,6 +89,15 @@ def execute(self, context: Context) -> None: sftp_client = ssh_hook.get_conn().open_sftp() + try: + sftp_client.stat(self.sftp_path) + except FileNotFoundError: + if self.fail_on_file_not_exist: + raise + else: + self.log.info("File %s not found on SFTP server. Skipping transfer.", self.sftp_path) + return + if self.use_temp_file: with NamedTemporaryFile("w") as f: sftp_client.get(self.sftp_path, f.name) diff --git a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py index 7b812712d7ef2..4ec1a117ac16a 100644 --- a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py +++ b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py @@ -120,3 +120,39 @@ def test_sftp_to_s3_operation(self, use_temp_file): conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key) conn.delete_bucket(Bucket=self.s3_bucket) assert not s3_hook.check_for_bucket(self.s3_bucket) + + @pytest.mark.parametrize("fail_on_file_not_exist", [True, False]) + @mock_aws + @conf_vars({("core", "enable_xcom_pickling"): "True"}) + def test_sftp_to_s3_fail_on_file_not_exist(self, fail_on_file_not_exist): + s3_hook = S3Hook(aws_conn_id=None) + conn = boto3.client("s3") + conn.create_bucket(Bucket=self.s3_bucket) + assert s3_hook.check_for_bucket(self.s3_bucket) + + if fail_on_file_not_exist: + with pytest.raises(FileNotFoundError): + SFTPToS3Operator( + s3_bucket=self.s3_bucket, + s3_key=self.s3_key, + sftp_path=self.sftp_path, + sftp_conn_id=SFTP_CONN_ID, + s3_conn_id=S3_CONN_ID, + fail_on_file_not_exist=fail_on_file_not_exist, + task_id="test_sftp_to_s3", + dag=self.dag, + ).execute(None) + else: + SFTPToS3Operator( + s3_bucket=self.s3_bucket, + s3_key=self.s3_key, + sftp_path=self.sftp_path, + sftp_conn_id=SFTP_CONN_ID, + s3_conn_id=S3_CONN_ID, + fail_on_file_not_exist=fail_on_file_not_exist, + task_id="test_sftp_to_s3", + dag=self.dag, + ).execute(None) + + conn.delete_bucket(Bucket=self.s3_bucket) + assert not s3_hook.check_for_bucket(self.s3_bucket) From 46991e4e4c43418dd861138d313adc873737f20e Mon Sep 17 00:00:00 2001 From: Namkyu Kim Date: Tue, 26 Nov 2024 00:37:41 +0900 Subject: [PATCH 2/3] Fixed test for fail_on_file_not_exist option. --- providers/tests/amazon/aws/transfers/test_sftp_to_s3.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py index 4ec1a117ac16a..e8fd3be4905da 100644 --- a/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py +++ b/providers/tests/amazon/aws/transfers/test_sftp_to_s3.py @@ -135,7 +135,7 @@ def test_sftp_to_s3_fail_on_file_not_exist(self, fail_on_file_not_exist): SFTPToS3Operator( s3_bucket=self.s3_bucket, s3_key=self.s3_key, - sftp_path=self.sftp_path, + sftp_path="/tmp/wrong_path.txt", sftp_conn_id=SFTP_CONN_ID, s3_conn_id=S3_CONN_ID, fail_on_file_not_exist=fail_on_file_not_exist, @@ -154,5 +154,6 @@ def test_sftp_to_s3_fail_on_file_not_exist(self, fail_on_file_not_exist): dag=self.dag, ).execute(None) + conn.delete_object(Bucket=self.s3_bucket, Key=self.s3_key) conn.delete_bucket(Bucket=self.s3_bucket) assert not s3_hook.check_for_bucket(self.s3_bucket) From aa260e90bfe346e0610036e60444f08f9481b54e Mon Sep 17 00:00:00 2001 From: Namkyu Kim <146211462+Guaqamole@users.noreply.github.com> Date: Tue, 26 Nov 2024 08:46:07 +0900 Subject: [PATCH 3/3] Remove else condition with fail_on_file_not exist option Co-authored-by: Ephraim Anierobi --- .../src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py index 6e3ac225d6c74..3145b50bfbfea 100644 --- a/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py +++ b/providers/src/airflow/providers/amazon/aws/transfers/sftp_to_s3.py @@ -94,9 +94,8 @@ def execute(self, context: Context) -> None: except FileNotFoundError: if self.fail_on_file_not_exist: raise - else: - self.log.info("File %s not found on SFTP server. Skipping transfer.", self.sftp_path) - return + self.log.info("File %s not found on SFTP server. Skipping transfer.", self.sftp_path) + return if self.use_temp_file: with NamedTemporaryFile("w") as f: