Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim Mityutko committed Jun 10, 2024
1 parent fc11f0e commit 74fd642
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ authors = [
# TODO: add other contributors
{ name = "Danny Meijer", email = "[email protected]" },
{ name = "Mikita Sakalouski", email = "[email protected]" },
{ name = "Maxim Mityutko", email = "[email protected]" },
]
classifiers = [
"Development Status :: 5 - Production/Stable",
Expand Down Expand Up @@ -63,6 +64,11 @@ se = ["spark-expectations>=2.1.0"]
sftp = ["paramiko>=2.6.0"]
delta = ["delta-spark>=2.2"]
excel = ["openpyxl>=3.0.0"]
# Tableau dependencies
tableau = [
"tableauhyperapi>=0.0.19484",
"tableauserverclient>=0.25"
]
dev = ["black", "isort", "ruff", "mypy", "pylint", "colorama", "types-PyYAML"]
test = [
"chispa",
Expand Down Expand Up @@ -179,6 +185,7 @@ features = [
"excel",
"se",
"box",
"tableau",
"dev",
]

Expand Down Expand Up @@ -243,6 +250,7 @@ features = [
"sftp",
"delta",
"excel",
"tableau",
"dev",
"test",
]
Expand Down Expand Up @@ -398,6 +406,7 @@ features = [
"sftp",
"delta",
"excel",
"tableau",
"dev",
"test",
"docs",
Expand Down
88 changes: 88 additions & 0 deletions src/koheesio/integrations/spark/tableau/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import Optional, Union

import urllib3
from tableauserverclient import PersonalAccessTokenAuth, TableauAuth, Server

from abc import ABC
from koheesio.models import BaseModel

from typing import ContextManager
from pydantic import SecretStr, Field


class TableauBaseModel(BaseModel, ABC):
url: str = Field(
default=...,
alias="url",
description="Hostname for the Tableau server, e.g. tableau.my-org.com",
examples=["tableau.my-org.com"],
)
user: str = Field(default=..., alias="user", description="Login name for the Tableau user")
password: SecretStr = Field(default=..., alias="password", description="Password for the Tableau user")
site_id: str = Field(
default=...,
alias="site_id",
description="Identifier for the Tableau site, as used in the URL: https://tableau.my-org.com/#/site/SITE_ID",
)
version: str = Field(
default="3.14",
alias="version",
description="Version of the Tableau server API",
)
token_name: Optional[str] = Field(
default=None,
alias="token_name",
description="Name of the Tableau Personal Access Token",
)
token_value: Optional[SecretStr] = Field(
default=None,
alias="token_value",
description="Value of the Tableau Personal Access Token",
)

# class Output(StepOutput):
# """
# Define outputs for the BoxFolderBase class
# """
#
# folder: Optional[Folder] = Field(default=None, description="Box folder object")
def __authenticate(self) -> ContextManager:
"""
Authenticate on the Tableau server.
Example
-------
```python
with self.__authenticate():
# Do something with the authenticated session
```
Returns
-------
TableauAuth or PersonalAccessTokenAuth authorization object
"""
# Suppress 'InsecureRequestWarning'
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

tableau_auth = TableauAuth(username=self.user, password=self.password, site_id=self.site_id)

if self.token_name and self.token_value:
self.log.info(
"Token details provided, this will take precedence over username and password authentication."
)
tableau_auth = PersonalAccessTokenAuth(
token_name=self.token_name, personal_access_token=self.token_value, site_id=self.site_id
)

server = Server(self.url)
server.version = self.version
server.add_http_options({"verify": False})

with server.auth.sign_in(tableau_auth):
# TODO: logging and check if authorized
if not self.site_id:
raise ValueError("Invalid credentials. Cannot create authorization to connect to Tableau Server.")

self.log.debug(f"Authorized in Tableau Server `{self.url}` and site `{self.site_id}`")

return server.auth.sign_in(tableau_auth)
138 changes: 138 additions & 0 deletions src/koheesio/integrations/spark/tableau/hyper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from koheesio.steps import Step, StepOutput
from koheesio.spark.readers import SparkStep
from koheesio.models import conlist

from koheesio.spark.transformations.cast_to_datatype import CastToDatatype

import os
from pydantic import Field
from abc import ABC, abstractmethod

from typing import Any, List
from tempfile import TemporaryDirectory

from pyspark.sql.types import StringType, FloatType, BooleanType, LongType, StructField, StructType

from pathlib import PurePath
from tableauhyperapi import (
Connection,
CreateMode,
HyperProcess,
Inserter,
NOT_NULLABLE,
NULLABLE,
SqlType,
TableDefinition,
TableName,
Telemetry,
)


class HyperFile(Step, ABC):
"""
Base class for all HyperFile classes
A HyperFile is a Step that reads data from a Hyper file.
"""
schema_: str = Field(default="Extract", alias="schema", description="Internal schema name within the Hyper file")
table: str = Field(default="Extract", description="Table name within the Hyper file")

@property
def table_name(self) -> TableName:
return TableName(self.schema_, self.table)


class HyperFileReader(HyperFile, SparkStep):
path: PurePath = Field(
default=...,
description="Path to the Hyper file",
examples=["PurePath(~/data/my-file.hyper)"]
)

def execute(self):
type_mapping = {
"date": StringType,
"text": StringType,
"double": FloatType,
"bool": BooleanType,
"big_int": LongType,
"timestamp": StringType,
}

df_cols = []
timestamp_cols = []

with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with Connection(endpoint=hp.endpoint, database=self.path) as connection:
table_definition = connection.catalog.get_table_definition(name=self.table_name)

select_cols = []
self.log.debug(f"Schema for {self.table_name} in {self.path}:")
for column in table_definition.columns:
self.log.debug(f"|-- {column.name}: {column.type} (nullable = {column.nullability})")

column_name = column.name.unescaped.__str__()
tableau_type = column.type.__str__().lower()
spark_type = type_mapping.get(tableau_type, StringType)

if tableau_type == "timestamp":
timestamp_cols.append(column_name)
col = f'cast("{column_name}" as text)'
elif tableau_type == "date":
col = f'cast("{column_name}" as text)'
else:
col = f'"{column_name}"'

df_cols.append(StructField(column_name, spark_type()))
select_cols.append(col)

data = connection.execute_list_query(f"select {','.join(select_cols)} from {self.table_name}")

df_schema = StructType(df_cols)
df = self.spark.createDataFrame(data, schema=df_schema)
df = CastToDatatype(column=timestamp_cols, datatype="timestamp").transform(df)

self.output.df = df


class HyperFileListWriter(HyperFile):
"""
TODO: Add description
"""
path: PurePath = Field(
default=TemporaryDirectory().name,
description="Path to the Hyper file",
examples=["PurePath(/tmp/hyper/)"]
)
name: str = Field(default="extract", description="Name of the Hyper file")
table_definition: TableDefinition = Field(
default=...,
description="Table definition to write to the Hyper file as described in "
"https://tableau.github.io/hyper-db/lang_docs/py/tableauhyperapi.html#tableauhyperapi.TableDefinition"
)
data: conlist(List[Any], min_length=1) = Field(default=..., description="Data to write to the Hyper file")

class Output(StepOutput):
"""Output class for HyperFileListWriter"""
hyper_path: PurePath = Field(default=..., description="Path to created Hyper file")

def execute(self):

if not os.path.exists(self.path):
os.makedirs(self.path)

hyper_path = PurePath(self.path, f"{self.name}.hyper")
self.log.info(f"Destination file: {hyper_path}")

with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp:
with Connection(
endpoint=hp.endpoint, database=hyper_path, create_mode=CreateMode.CREATE_AND_REPLACE
) as connection:
connection.catalog.create_schema(schema=self.table_definition.table_name.schema_name)
connection.catalog.create_table(table_definition=self.table_definition)

with Inserter(connection, self.table_definition) as inserter:
inserter.add_rows(rows=self.data)
inserter.execute()

self.output.hyper_path = hyper_path

0 comments on commit 74fd642

Please sign in to comment.