From 74fd642e9afa421206f0254e25187e61760c9821 Mon Sep 17 00:00:00 2001 From: Maxim Mityutko Date: Mon, 10 Jun 2024 18:44:27 +0200 Subject: [PATCH] draft --- pyproject.toml | 9 ++ .../integrations/spark/tableau/__init__.py | 88 +++++++++++ .../integrations/spark/tableau/hyper.py | 138 ++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 src/koheesio/integrations/spark/tableau/__init__.py create mode 100644 src/koheesio/integrations/spark/tableau/hyper.py diff --git a/pyproject.toml b/pyproject.toml index 63f988f..97fbb22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ authors = [ # TODO: add other contributors { name = "Danny Meijer", email = "danny.meijer@nike.com" }, { name = "Mikita Sakalouski", email = "mikita.sakalouski@nike.com" }, + { name = "Maxim Mityutko", email = "maxim.mityutko@nike.com" }, ] classifiers = [ "Development Status :: 5 - Production/Stable", @@ -63,6 +64,11 @@ se = ["spark-expectations>=2.1.0"] sftp = ["paramiko>=2.6.0"] delta = ["delta-spark>=2.2"] excel = ["openpyxl>=3.0.0"] +# Tableau dependencies +tableau = [ + "tableauhyperapi>=0.0.19484", + "tableauserverclient>=0.25" +] dev = ["black", "isort", "ruff", "mypy", "pylint", "colorama", "types-PyYAML"] test = [ "chispa", @@ -179,6 +185,7 @@ features = [ "excel", "se", "box", + "tableau", "dev", ] @@ -243,6 +250,7 @@ features = [ "sftp", "delta", "excel", + "tableau", "dev", "test", ] @@ -398,6 +406,7 @@ features = [ "sftp", "delta", "excel", + "tableau", "dev", "test", "docs", diff --git a/src/koheesio/integrations/spark/tableau/__init__.py b/src/koheesio/integrations/spark/tableau/__init__.py new file mode 100644 index 0000000..abbcd5e --- /dev/null +++ b/src/koheesio/integrations/spark/tableau/__init__.py @@ -0,0 +1,88 @@ +from typing import Optional, Union + +import urllib3 +from tableauserverclient import PersonalAccessTokenAuth, TableauAuth, Server + +from abc import ABC +from koheesio.models import BaseModel + +from typing import ContextManager +from pydantic import SecretStr, Field + + +class TableauBaseModel(BaseModel, ABC): + url: str = Field( + default=..., + alias="url", + description="Hostname for the Tableau server, e.g. tableau.my-org.com", + examples=["tableau.my-org.com"], + ) + user: str = Field(default=..., alias="user", description="Login name for the Tableau user") + password: SecretStr = Field(default=..., alias="password", description="Password for the Tableau user") + site_id: str = Field( + default=..., + alias="site_id", + description="Identifier for the Tableau site, as used in the URL: https://tableau.my-org.com/#/site/SITE_ID", + ) + version: str = Field( + default="3.14", + alias="version", + description="Version of the Tableau server API", + ) + token_name: Optional[str] = Field( + default=None, + alias="token_name", + description="Name of the Tableau Personal Access Token", + ) + token_value: Optional[SecretStr] = Field( + default=None, + alias="token_value", + description="Value of the Tableau Personal Access Token", + ) + + # class Output(StepOutput): + # """ + # Define outputs for the BoxFolderBase class + # """ + # + # folder: Optional[Folder] = Field(default=None, description="Box folder object") + def __authenticate(self) -> ContextManager: + """ + Authenticate on the Tableau server. + + Example + ------- + ```python + with self.__authenticate(): + # Do something with the authenticated session + ``` + + Returns + ------- + TableauAuth or PersonalAccessTokenAuth authorization object + """ + # Suppress 'InsecureRequestWarning' + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + tableau_auth = TableauAuth(username=self.user, password=self.password, site_id=self.site_id) + + if self.token_name and self.token_value: + self.log.info( + "Token details provided, this will take precedence over username and password authentication." + ) + tableau_auth = PersonalAccessTokenAuth( + token_name=self.token_name, personal_access_token=self.token_value, site_id=self.site_id + ) + + server = Server(self.url) + server.version = self.version + server.add_http_options({"verify": False}) + + with server.auth.sign_in(tableau_auth): + # TODO: logging and check if authorized + if not self.site_id: + raise ValueError("Invalid credentials. Cannot create authorization to connect to Tableau Server.") + + self.log.debug(f"Authorized in Tableau Server `{self.url}` and site `{self.site_id}`") + + return server.auth.sign_in(tableau_auth) diff --git a/src/koheesio/integrations/spark/tableau/hyper.py b/src/koheesio/integrations/spark/tableau/hyper.py new file mode 100644 index 0000000..8be26b9 --- /dev/null +++ b/src/koheesio/integrations/spark/tableau/hyper.py @@ -0,0 +1,138 @@ +from koheesio.steps import Step, StepOutput +from koheesio.spark.readers import SparkStep +from koheesio.models import conlist + +from koheesio.spark.transformations.cast_to_datatype import CastToDatatype + +import os +from pydantic import Field +from abc import ABC, abstractmethod + +from typing import Any, List +from tempfile import TemporaryDirectory + +from pyspark.sql.types import StringType, FloatType, BooleanType, LongType, StructField, StructType + +from pathlib import PurePath +from tableauhyperapi import ( + Connection, + CreateMode, + HyperProcess, + Inserter, + NOT_NULLABLE, + NULLABLE, + SqlType, + TableDefinition, + TableName, + Telemetry, +) + + +class HyperFile(Step, ABC): + """ + Base class for all HyperFile classes + + A HyperFile is a Step that reads data from a Hyper file. + """ + schema_: str = Field(default="Extract", alias="schema", description="Internal schema name within the Hyper file") + table: str = Field(default="Extract", description="Table name within the Hyper file") + + @property + def table_name(self) -> TableName: + return TableName(self.schema_, self.table) + + +class HyperFileReader(HyperFile, SparkStep): + path: PurePath = Field( + default=..., + description="Path to the Hyper file", + examples=["PurePath(~/data/my-file.hyper)"] + ) + + def execute(self): + type_mapping = { + "date": StringType, + "text": StringType, + "double": FloatType, + "bool": BooleanType, + "big_int": LongType, + "timestamp": StringType, + } + + df_cols = [] + timestamp_cols = [] + + with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp: + with Connection(endpoint=hp.endpoint, database=self.path) as connection: + table_definition = connection.catalog.get_table_definition(name=self.table_name) + + select_cols = [] + self.log.debug(f"Schema for {self.table_name} in {self.path}:") + for column in table_definition.columns: + self.log.debug(f"|-- {column.name}: {column.type} (nullable = {column.nullability})") + + column_name = column.name.unescaped.__str__() + tableau_type = column.type.__str__().lower() + spark_type = type_mapping.get(tableau_type, StringType) + + if tableau_type == "timestamp": + timestamp_cols.append(column_name) + col = f'cast("{column_name}" as text)' + elif tableau_type == "date": + col = f'cast("{column_name}" as text)' + else: + col = f'"{column_name}"' + + df_cols.append(StructField(column_name, spark_type())) + select_cols.append(col) + + data = connection.execute_list_query(f"select {','.join(select_cols)} from {self.table_name}") + + df_schema = StructType(df_cols) + df = self.spark.createDataFrame(data, schema=df_schema) + df = CastToDatatype(column=timestamp_cols, datatype="timestamp").transform(df) + + self.output.df = df + + +class HyperFileListWriter(HyperFile): + """ + TODO: Add description + """ + path: PurePath = Field( + default=TemporaryDirectory().name, + description="Path to the Hyper file", + examples=["PurePath(/tmp/hyper/)"] + ) + name: str = Field(default="extract", description="Name of the Hyper file") + table_definition: TableDefinition = Field( + default=..., + description="Table definition to write to the Hyper file as described in " + "https://tableau.github.io/hyper-db/lang_docs/py/tableauhyperapi.html#tableauhyperapi.TableDefinition" + ) + data: conlist(List[Any], min_length=1) = Field(default=..., description="Data to write to the Hyper file") + + class Output(StepOutput): + """Output class for HyperFileListWriter""" + hyper_path: PurePath = Field(default=..., description="Path to created Hyper file") + + def execute(self): + + if not os.path.exists(self.path): + os.makedirs(self.path) + + hyper_path = PurePath(self.path, f"{self.name}.hyper") + self.log.info(f"Destination file: {hyper_path}") + + with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hp: + with Connection( + endpoint=hp.endpoint, database=hyper_path, create_mode=CreateMode.CREATE_AND_REPLACE + ) as connection: + connection.catalog.create_schema(schema=self.table_definition.table_name.schema_name) + connection.catalog.create_table(table_definition=self.table_definition) + + with Inserter(connection, self.table_definition) as inserter: + inserter.add_rows(rows=self.data) + inserter.execute() + + self.output.hyper_path = hyper_path