Skip to content

Commit

Permalink
Implement FTP and SFTP configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 27, 2024
1 parent 9fd33fe commit 3b11ac4
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 4 deletions.
90 changes: 89 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ faker = {version = ">=22.5", optional = true}
cryptography = { version = ">=3.4.6", optional = true }
PyJWT = { version = "~=2.4", optional = true }

# SSH extras
paramiko = ">=3.3.0"

[tool.poetry.extras]
jwt = [
"cryptography",
Expand All @@ -117,6 +120,7 @@ docs = [
"sphinx-reredirects",
]
s3 = ["fs-s3fs", "s3fs"]
ssh = ["paramiko"]
testing = [
"pytest",
]
Expand Down
97 changes: 97 additions & 0 deletions singer_sdk/contrib/filesystem/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""JSON Schema for each filesystem configuration."""

from __future__ import annotations

from singer_sdk import typing as th # JSON schema typing helpers

FTP = th.Property(
"ftp",
th.ObjectType(
th.Property(
"host",
th.StringType,
required=True,
description="FTP server host",
),
th.Property(
"port",
th.IntegerType,
default=21,
description="FTP server port",
),
th.Property(
"username",
th.StringType,
description="FTP username",
),
th.Property(
"password",
th.StringType,
secret=True,
description="FTP password",
),
th.Property(
"timeout",
th.IntegerType,
default=60,
description="Timeout of the FTP connection in seconds",
),
th.Property(
"encoding",
th.StringType,
default="utf-8",
description="FTP server encoding",
),
),
description="FTP connection settings",
)


SFTP = th.Property(
"sftp",
th.ObjectType(
th.Property(
"host",
th.StringType,
required=True,
description="SFTP server host",
),
th.Property(
"ssh_kwargs",
th.ObjectType(
th.Property(
"port",
th.IntegerType,
default=22,
description="SFTP server port",
),
th.Property(
"username",
th.StringType,
required=True,
description="SFTP username",
),
th.Property(
"password",
th.StringType,
secret=True,
description="SFTP password",
),
th.Property(
"pkey",
th.StringType,
secret=True,
description="Private key",
),
th.Property(
"timeout",
th.IntegerType,
default=60,
description="Timeout of the SFTP connection in seconds",
),
),
description="SSH connection settings",
),
),
description="SFTP connection settings",
)
25 changes: 22 additions & 3 deletions singer_sdk/contrib/filesystem/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import enum
import functools
import logging
import os
import typing as t
from pathlib import Path
Expand All @@ -12,7 +13,11 @@

import singer_sdk.typing as th
from singer_sdk import Tap
from singer_sdk.contrib.filesystem import config as filesystem_config
from singer_sdk.contrib.filesystem.stream import FileStream
from singer_sdk.exceptions import ConfigValidationError

logger = logging.getLogger(__name__)

DEFAULT_MERGE_STREAM_NAME = "files"

Expand All @@ -30,7 +35,7 @@ class ReadMode(str, enum.Enum):
th.StringType,
required=True,
default="local",
allowed_values=["local"],
allowed_values=["local", "ftp", "sftp"],
description="The filesystem to use.",
),
th.Property(
Expand All @@ -56,6 +61,8 @@ class ReadMode(str, enum.Enum):
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
filesystem_config.FTP,
filesystem_config.SFTP,
).to_dict()


Expand Down Expand Up @@ -121,8 +128,20 @@ def read_mode(self) -> ReadMode:

@functools.cached_property
def fs(self) -> fsspec.AbstractFileSystem:
"""Return the filesystem object."""
return fsspec.filesystem(self.config["filesystem"])
"""Return the filesystem object.
Raises:
ConfigValidationError: If the filesystem configuration is missing.
"""
protocol = self.config["filesystem"]
if protocol != "local" and protocol not in self.config: # pragma: no cover
msg = "Filesytem configuration is missing"
raise ConfigValidationError(
msg,
errors=[f"Missing configuration for filesystem {protocol}"],
)
logger.info("Instatiating filesystem inteface: '%s'", protocol)
return fsspec.filesystem(protocol, **self.config.get(protocol, {}))

def discover_streams(self) -> list:
"""Return a list of discovered streams.
Expand Down

0 comments on commit 3b11ac4

Please sign in to comment.