Skip to content

Commit

Permalink
feat(connector): multipart upload (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
portellaa authored Mar 11, 2024
1 parent 6026814 commit eedd5e3
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 77 deletions.
41 changes: 35 additions & 6 deletions src/ydata/sdk/common/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from httpx import Client as httpClient
from httpx import HTTPStatusError, Response, Timeout
from httpx import codes as http_codes
from httpx._types import RequestContent
from typeguard import typechecked

from ydata.sdk.common.client.parser import LinkExtractor
Expand Down Expand Up @@ -50,8 +51,7 @@ class Client(metaclass=SingletonClient):
DEFAULT_PROJECT: Optional[Project] = environ.get("DEFAULT_PROJECT", None)

def __init__(self, credentials: Optional[Union[str, Dict]] = None, project: Optional[Project] = None, set_as_global: bool = False):
self._base_url = environ.get("YDATA_BASE_URL", DEFAULT_URL)
self._scheme = 'https'
self._base_url = environ.get("YDATA_BASE_URL", DEFAULT_URL).removesuffix('/')
self._headers = {'Authorization': credentials}
self._http_client = httpClient(
headers=self._headers, timeout=Timeout(10, read=None))
Expand All @@ -72,13 +72,15 @@ def project(self, value: Project):
self._default_project = value

def post(
self, endpoint: str, data: Optional[Dict] = None, json: Optional[Dict] = None,
project: Optional[Project] = None, files: Optional[Dict] = None, raise_for_status: bool = True
self, endpoint: str, content: Optional[RequestContent] = None, data: Optional[Dict] = None,
json: Optional[Dict] = None, project: Optional[Project] = None, files: Optional[Dict] = None,
raise_for_status: bool = True
) -> Response:
"""POST request to the backend.
Args:
endpoint (str): POST endpoint
content (Optional[RequestContent])
data (Optional[dict]): (optional) multipart form data
json (Optional[dict]): (optional) json data
files (Optional[dict]): (optional) files to be sent
Expand All @@ -96,6 +98,33 @@ def post(

return response

def patch(
self, endpoint: str, content: Optional[RequestContent] = None, data: Optional[Dict] = None,
json: Optional[Dict] = None, project: Optional[Project] = None, files: Optional[Dict] = None,
raise_for_status: bool = True
) -> Response:
"""PATCH request to the backend.
Args:
endpoint (str): POST endpoint
content (Optional[RequestContent])
data (Optional[dict]): (optional) multipart form data
json (Optional[dict]): (optional) json data
files (Optional[dict]): (optional) files to be sent
raise_for_status (bool): raise an exception on error
Returns:
Response object
"""
url_data = self.__build_url(
endpoint, data=data, json=json, files=files, project=project)
response = self._http_client.patch(**url_data, content=content)

if response.status_code != Client.codes.OK and raise_for_status:
self.__raise_for_status(response)

return response

def get(
self, endpoint: str, params: Optional[Dict] = None, project: Optional[Project] = None,
cookies: Optional[Dict] = None, raise_for_status: bool = True
Expand Down Expand Up @@ -132,7 +161,7 @@ def get_static_file(
Response object
"""
url_data = self.__build_url(endpoint, project=project)
url_data['url'] = f'{self._scheme}://{self._base_url}/static-content{endpoint}'
url_data['url'] = f'{self._base_url}/static-content{endpoint}'
response = self._http_client.get(**url_data)

if response.status_code != Client.codes.OK and raise_for_status:
Expand Down Expand Up @@ -178,7 +207,7 @@ def __build_url(self, endpoint: str, params: Optional[Dict] = None, data: Option
}

url_data = {
'url': f'{self._scheme}://{self._base_url}/api{endpoint}',
'url': f'{self._base_url}/{endpoint.removeprefix("/")}',
'headers': self._headers,
'params': _params,
}
Expand Down
2 changes: 1 addition & 1 deletion src/ydata/sdk/common/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def init_client(func):
"""
@wraps(func)
def wrapper_func(*args, **kwargs):
if not any((arg for arg in args if isinstance(arg, Client))):
if kwargs.get('client', None) is None and not any((arg for arg in args if isinstance(arg, Client))):
kwargs['client'] = get_client(kwargs.get('client'))
return func(*args, **kwargs)
return wrapper_func
2 changes: 1 addition & 1 deletion src/ydata/sdk/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
from os import environ

LOG_LEVEL = environ.get('LOG_LEVEL', 'WARNING')
DEFAULT_URL = "fabric.ydata.ai"
DEFAULT_URL = "https://fabric.ydata.ai/api"
BACKOFF = 10 # 10s backoff between requests
TOKEN_VAR = 'YDATA_TOKEN'
11 changes: 7 additions & 4 deletions src/ydata/sdk/common/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
from pydantic import Extra


class Config:
allow_population_by_field_name = True
extra = Extra.ignore
use_enum_values = True


class BaseModel(PydanticBaseModel):
"""BaseModel replacement from pydantic.
All datamodel from YData SDK inherits from this class.
"""
class Config:
allow_population_by_field_name = True
extra = Extra.ignore
use_enum_values = True
Config = Config
4 changes: 2 additions & 2 deletions src/ydata/sdk/connectors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ydata.sdk.connectors._models.connector_type import ConnectorType
from ydata.sdk.connectors.connector import Connector
from ydata.sdk.connectors.connector import Connector, LocalConnector

__all__ = ["Connector", "ConnectorType"]
__all__ = ["Connector", "ConnectorType", "LocalConnector"]
2 changes: 1 addition & 1 deletion src/ydata/sdk/connectors/_models/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class Connector(BaseModel):

uid: UID
uid: Optional[UID] = None
type: ConnectorType
name: Optional[str] = None

Expand Down
2 changes: 1 addition & 1 deletion src/ydata/sdk/connectors/_models/connector_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ydata.sdk.common.exceptions import InvalidConnectorError


class ConnectorType(Enum):
class ConnectorType(str, Enum):
AWS_S3 = "aws-s3"
"""AWS S3 connector"""
AZURE_BLOB = "azure-blob"
Expand Down
9 changes: 9 additions & 0 deletions src/ydata/sdk/connectors/_models/local_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Optional

from pydantic import Field

from .connector import Connector


class LocalConnector(Connector):
file: Optional[str] = Field(None)
13 changes: 13 additions & 0 deletions src/ydata/sdk/connectors/_models/upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Optional

from pydantic import Field

from ydata.sdk.common.model import BaseModel


class Upload(BaseModel):
uid: str
chunk_size: int = Field(alias='chunkSize')
file_name: str = Field(alias='fileName')
written_bytes: Optional[int] = Field(None, alias='writtenBytes')
total_bytes: Optional[int] = Field(None, alias='totalBytes')
129 changes: 109 additions & 20 deletions src/ydata/sdk/connectors/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from io import BytesIO
from json import loads as json_loads
from pathlib import Path
from typing import Dict, Optional, Union
from typing import Dict, Optional, TypeVar, Union
from uuid import uuid4

from pandas import DataFrame as pdDataFrame

from ydata.sdk.common.client import Client
from ydata.sdk.common.client.utils import init_client
from ydata.sdk.common.config import LOG_LEVEL
Expand All @@ -13,10 +16,14 @@
from ydata.sdk.connectors._models.connector_list import ConnectorsList
from ydata.sdk.connectors._models.connector_type import ConnectorType
from ydata.sdk.connectors._models.credentials.credentials import Credentials
from ydata.sdk.connectors._models.local_connector import LocalConnector as mLocalConnector
from ydata.sdk.connectors._models.rdbms_connector import RDBMSConnector as mRDBMSConnector
from ydata.sdk.connectors._models.schema import Schema
from ydata.sdk.connectors._models.upload import Upload
from ydata.sdk.utils.model_mixin import ModelFactoryMixin

_T = TypeVar('_T', bound='Connector')


class Connector(ModelFactoryMixin):
"""A [`Connector`][ydata.sdk.connectors.Connector] allows to connect and
Expand Down Expand Up @@ -57,9 +64,13 @@ def _init_common(self, client: Optional[Client] = None):
def uid(self) -> UID:
return self._model.uid

@property
def name(self) -> str:
return self._model.name

@property
def type(self) -> ConnectorType:
return self._model.type
return ConnectorType(self._model.type)

@property
def project(self) -> Project:
Expand All @@ -69,7 +80,7 @@ def project(self) -> Project:
@init_client
def get(
uid: UID, project: Optional[Project] = None, client: Optional[Client] = None
) -> Union["Connector", "RDBMSConnector"]:
) -> _T:
"""Get an existing connector.
Arguments:
Expand Down Expand Up @@ -121,7 +132,7 @@ def _init_credentials(
def create(
connector_type: Union[ConnectorType, str], credentials: Union[str, Path, Dict, Credentials],
name: Optional[str] = None, project: Optional[Project] = None, client: Optional[Client] = None
) -> Union["Connector", "RDBMSConnector"]:
) -> _T:
"""Create a new connector.
Arguments:
Expand All @@ -136,28 +147,27 @@ def create(
"""
connector_type = ConnectorType._init_connector_type(connector_type)
connector_class = _connector_type_to_model(connector_type)
model = connector_class._create_model(
connector_type=connector_type, credentials=credentials, name=name, project=project, client=client)

payload = {
"type": connector_type.value,
"credentials": credentials.dict(by_alias=True)
}
model = connector_class._create(payload, name, project, client)

connector = connector_class._init_from_model_data(model)
connector._project = project
return connector

@classmethod
@init_client
def _create_model(
cls, connector_type: Union[ConnectorType, str], credentials: Union[str, Path, Dict, Credentials],
name: Optional[str] = None, project: Optional[Project] = None, client: Optional[Client] = None
) -> Union[mConnector, mRDBMSConnector]:
def _create(
cls, payload: dict, name: Optional[str] = None, project: Optional[Project] = None,
client: Optional[Client] = None
) -> _MODEL_CLASS:
_name = name if name is not None else str(uuid4())
_connector_type = ConnectorType._init_connector_type(connector_type)
_credentials = Connector._init_credentials(_connector_type, credentials)
payload = {
"type": _connector_type.value,
"credentials": _credentials.dict(by_alias=True),
"name": _name
}
payload["name"] = _name
response = client.post('/connector/', project=project, json=payload)
data: list = response.json()
data = response.json()

return cls._MODEL_CLASS(**data)

Expand Down Expand Up @@ -191,5 +201,84 @@ def schema(self) -> Optional[Schema]:
return self._model.db_schema


def _connector_type_to_model(connector_type: ConnectorType) -> Union[Connector, RDBMSConnector]:
return RDBMSConnector if connector_type.is_rdbms else Connector
class LocalConnector(Connector):
_MODEL_CLASS = mLocalConnector
_model: Optional[mLocalConnector]

@staticmethod
def create(
source: Union[pdDataFrame, str, Path], connector_type: Union[ConnectorType, str] = ConnectorType.FILE,
name: Optional[str] = None, project: Optional[Project] = None, client: Optional[Client] = None
) -> "LocalConnector":
"""Create a new connector.
Arguments:
source (Union[pdDataFrame, str, Path]): pandas dataframe, string or path to a file
name (Optional[str]): (optional) Connector name
project (Optional[Project]): (optional) Project where to create the connector
client (Client): (optional) Client to connect to the backend
Returns:
New connector
"""

if isinstance(source, str):
source = Path(source)

if isinstance(source, pdDataFrame):
upload = LocalConnector._upload_dataframe(source, project, client=client)
else:
upload = LocalConnector._upload_file(source, project, client=client)

model = mLocalConnector(name=name, type=connector_type, file=upload.uid)
model = LocalConnector._create(model.dict(
by_alias=True), name, project, client=client)

connector = LocalConnector._init_from_model_data(model)
connector._project = project
return connector

@staticmethod
def _upload_dataframe(
source: pdDataFrame, project: Optional[Project] = None, client: Optional[Client] = None
) -> Upload:
buffer = BytesIO()
source.to_csv(buffer, index=False)
return LocalConnector._upload(buffer, project, client=client)

@staticmethod
def _upload_file(source: Path, project: Optional[Project] = None, client: Optional[Client] = None) -> Upload:
with source.open('rb') as f:
buffer = BytesIO(f.read())
return LocalConnector._upload(buffer, project, client=client)

@staticmethod
@init_client
def _upload(source_bytes: BytesIO, project: Optional[Project] = None, client: Optional[Client] = None) -> Upload:
length = source_bytes.getbuffer().nbytes

created_upload = client.post(
"/upload", project=project, json={"size": length}, raise_for_status=True)
created_upload.raise_for_status()
created_upload_dict = created_upload.json()

upload_id = created_upload_dict["uid"]
chunk_size = int(created_upload_dict["chunkSize"])

source_bytes.seek(0)
while True:
chunk_bytes = source_bytes.read(chunk_size)
if not chunk_bytes:
break

upload_response = client.patch(
f"/upload/{upload_id}", content=chunk_bytes, project=project, raise_for_status=True
)
data = upload_response.json()
upload = Upload(**data)

return upload


def _connector_type_to_model(connector_type: ConnectorType) -> _T:
return RDBMSConnector if connector_type.is_rdbms else LocalConnector if connector_type is ConnectorType.FILE else Connector
2 changes: 1 addition & 1 deletion src/ydata/sdk/datasources/_models/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ class Status(BaseModel):

@staticmethod
def unknown() -> "Status":
return Status(state=Status.State.UNKNOWN)
return Status(state=State.UNKNOWN)
Loading

0 comments on commit eedd5e3

Please sign in to comment.