Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Tableau Hyper #49

Merged
merged 19 commits into from
Sep 4, 2024
136 changes: 83 additions & 53 deletions src/koheesio/integrations/spark/tableau/hyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ class HyperFileReader(HyperFile, SparkStep):
"""
Read a Hyper file and return a Spark DataFrame.

Example:
df = HyperFileReader(
path=PurePath(hw.hyper_path),
).execute().df
Examples
--------
```python
df = HyperFileReader(
path=PurePath(hw.hyper_path),
).execute().df
```
"""

path: PurePath = Field(
Expand Down Expand Up @@ -148,7 +151,9 @@ class HyperFileWriter(HyperFile):
)

class Output(StepOutput):
"""Output class for HyperFileListWriter"""
"""
Output class for HyperFileListWriter
"""

hyper_path: PurePath = Field(default=..., description="Path to created Hyper file")

Expand Down Expand Up @@ -176,29 +181,33 @@ class HyperFileListWriter(HyperFileWriter):
"""
Write list of rows to a Hyper file.

Reference:
Datatypes in https://tableau.github.io/hyper-db/docs/sql/datatype/ for supported data types.

Example:
hw = HyperFileListWriter(
name="test",
table_definition=TableDefinition(
table_name=TableName("Extract", "Extract"),
columns=[
TableDefinition.Column(name="string", type=SqlType.text(), nullability=NOT_NULLABLE),
TableDefinition.Column(name="int", type=SqlType.int(), nullability=NULLABLE),
TableDefinition.Column(name="timestamp", type=SqlType.timestamp(), nullability=NULLABLE),
]
),
data=[
["text_1", 1, datetime(2024, 1, 1, 0, 0, 0, 0)],
["text_2", 2, datetime(2024, 1, 2, 0, 0, 0, 0)],
["text_3", None, None],
],
).execute()

# do somthing with returned file path
hw.hyper_path
Reference
---------
Datatypes in https://tableau.github.io/hyper-db/docs/sql/datatype/ for supported data types.

Examples
--------
```python
hw = HyperFileListWriter(
name="test",
table_definition=TableDefinition(
table_name=TableName("Extract", "Extract"),
columns=[
TableDefinition.Column(name="string", type=SqlType.text(), nullability=NOT_NULLABLE),
TableDefinition.Column(name="int", type=SqlType.int(), nullability=NULLABLE),
TableDefinition.Column(name="timestamp", type=SqlType.timestamp(), nullability=NULLABLE),
]
),
data=[
["text_1", 1, datetime(2024, 1, 1, 0, 0, 0, 0)],
["text_2", 2, datetime(2024, 1, 2, 0, 0, 0, 0)],
["text_3", None, None],
],
).execute()

# do somthing with returned file path
hw.hyper_path
```
"""

data: conlist(List[Any], min_length=1) = Field(default=..., description="List of rows to write to the Hyper file")
Expand All @@ -221,31 +230,36 @@ class HyperFileParquetWriter(HyperFileWriter):
"""
Read one or multiple parquet files and write them to a Hyper file.
dannymeijer marked this conversation as resolved.
Show resolved Hide resolved

Note:
This method is much faster than HyperFileListWriter for large files.

Reference:
Copy from external format: https://tableau.github.io/hyper-db/docs/sql/command/copy_from
Datatypes in https://tableau.github.io/hyper-db/docs/sql/datatype/ for supported data types.
Parquet format limitations:
https://tableau.github.io/hyper-db/docs/sql/external/formats/#external-format-parquet

Example:
hw = HyperFileParquetWriter(
name="test",
table_definition=TableDefinition(
table_name=TableName("Extract", "Extract"),
columns=[
TableDefinition.Column(name="string", type=SqlType.text(), nullability=NOT_NULLABLE),
TableDefinition.Column(name="int", type=SqlType.int(), nullability=NULLABLE),
TableDefinition.Column(name="timestamp", type=SqlType.timestamp(), nullability=NULLABLE),
]
),
files=["/my-path/parquet-1.snappy.parquet","/my-path/parquet-2.snappy.parquet"]
).execute()

# do somthing with returned file path
hw.hyper_path
Notes
-----
This method is much faster than HyperFileListWriter for large files.

References
----------
Copy from external format: https://tableau.github.io/hyper-db/docs/sql/command/copy_from
Datatypes in https://tableau.github.io/hyper-db/docs/sql/datatype/ for supported data types.
Parquet format limitations:
https://tableau.github.io/hyper-db/docs/sql/external/formats/#external-format-parquet

Examples
--------
```python
hw = HyperFileParquetWriter(
name="test",
table_definition=TableDefinition(
table_name=TableName("Extract", "Extract"),
columns=[
TableDefinition.Column(name="string", type=SqlType.text(), nullability=NOT_NULLABLE),
TableDefinition.Column(name="int", type=SqlType.int(), nullability=NULLABLE),
TableDefinition.Column(name="timestamp", type=SqlType.timestamp(), nullability=NULLABLE),
]
),
files=["/my-path/parquet-1.snappy.parquet","/my-path/parquet-2.snappy.parquet"]
).execute()

# do somthing with returned file path
hw.hyper_path
```
"""

file: conlist(Union[str, PurePath], min_length=1) = Field(
Expand All @@ -270,6 +284,22 @@ def execute(self):


class HyperFileDataFrameWriter(HyperFileWriter):
"""
Write a Spark DataFrame to a Hyper file.
The process will write the DataFrame to a parquet file and then use the HyperFileParquetWriter to write to the
Hyper file.

Examples
--------
```python
hw = HyperFileDataFrameWriter(
df=spark.createDataFrame([(1, "foo"), (2, "bar")], ["id", "name"]),
name="test",
).execute()

# do somthing with returned file path
hw.hyper_path
dannymeijer marked this conversation as resolved.
Show resolved Hide resolved
"""
df: DataFrame = Field(default=..., description="Spark DataFrame to write to the Hyper file")
table_definition: Optional[TableDefinition] = None # table_definition is not required for this class

Expand Down
34 changes: 25 additions & 9 deletions src/koheesio/integrations/spark/tableau/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


class TableauServer(Step):
dannymeijer marked this conversation as resolved.
Show resolved Hide resolved
"""
Base class for Tableau server interactions. Class provides authentication and project identification functionality.
"""
url: str = Field(
default=...,
alias="url",
Expand Down Expand Up @@ -85,11 +88,16 @@ def auth(self) -> ContextManager:
"""
Authenticate on the Tableau server.

Example:
with self._authenticate():
Examples
--------
```python
with self._authenticate():
self.server.projects.get()
```

Returns:
TableauAuth or PersonalAccessTokenAuth authorization object
Returns
-------
ContextManager for TableauAuth or PersonalAccessTokenAuth authorization object
"""
# Suppress 'InsecureRequestWarning'
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand All @@ -114,16 +122,19 @@ def auth(self) -> ContextManager:
def working_project(self) -> Union[ProjectItem, None]:
"""
Identify working project by using `project` and `parent_project` (if necessary) class properties.
The goal is to uniquely identify specific project on the server, if multiple projects have the same
The goal is to uniquely identify specific project on the server. If multiple projects have the same
name, the `parent_project` attribute of the TableauServer is required.

Notes
-----
Set `parent_project` value to 'root' if the project is located in the root directory.

If `id` of the project is known, it can be used in `project_id` parameter, then the detection of the working
project using the `project` and `parent_project` attributes is skipped.

Returns:
ProjectItem: ProjectItem object representing the working project
Returns
-------
ProjectItem object representing the working project
"""

with self.auth:
Expand Down Expand Up @@ -168,14 +179,17 @@ def execute(self):

class TableauHyperPublishMode(str, Enum):
"""
The different publishing modes for the TableauHyperPublisher.
Publishing modes for the TableauHyperPublisher.
"""

APPEND = Server.PublishMode.Append
OVERWRITE = Server.PublishMode.Overwrite


class TableauHyperPublisher(TableauServer):
dannymeijer marked this conversation as resolved.
Show resolved Hide resolved
"""
Publish the given Hyper file to the Tableau server. Hyper file will be treated by Tableau server as a datasource.
"""
datasource_name: str = Field(default=..., description="Name of the datasource to publish")
hyper_path: PurePath = Field(default=..., description="Path to Hyper file")
publish_mode: TableauHyperPublishMode = Field(
Expand All @@ -184,7 +198,9 @@ class TableauHyperPublisher(TableauServer):
)

class Output(StepOutput):
"""Output class for HyperFileListWriter"""
"""
Output class for TableauHyperPublisher
"""

datasource_item: DatasourceItem = Field(
default=..., description="DatasourceItem object representing the published datasource"
Expand Down