Skip to content

Commit

Permalink
Merge pull request #260 from JCotton1123/excel-plugin-output
Browse files Browse the repository at this point in the history
Update Excel plugin with output support
  • Loading branch information
jwills authored Oct 18, 2023
2 parents f57a438 + d77f5e6 commit 0d5ac9d
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 33 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ initialize the `sqlalchemy` plugin (aliased as "sql") with a `connection_url` th

Please remember that using plugins may require you to add additional dependencies to the Python environment that your dbt-duckdb pipeline runs in:

* `excel` depends on `pandas`
* `excel` depends on `pandas`, and `openpyxl` or `xlsxwriter` to perform writes
* `gsheet` depends on `gspread` and `pandas`
* `iceberg` depends on `pyiceberg` and Python >= 3.8
* `sqlalchemy` depends on `pandas`, `sqlalchemy`, and the driver(s) you need
Expand Down
80 changes: 80 additions & 0 deletions dbt/adapters/duckdb/plugins/excel.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
import os
import pathlib
from threading import Lock
from typing import Any
from typing import Dict

import pandas as pd
from pandas.io.formats import excel

from . import BasePlugin
from . import pd_utils
from ..utils import SourceConfig
from ..utils import TargetConfig
from dbt.logger import GLOBAL_LOGGER as logger


class Plugin(BasePlugin):
def initialize(self, plugin_config: Dict[str, Any]):
self._config = plugin_config

if "output" in plugin_config:
self._excel_writer_create_lock = Lock()
assert isinstance(plugin_config["output"], dict)
assert "file" in plugin_config["output"]

# Pass s3 settings to plugin environment
if "s3_access_key_id" in plugin_config:
os.environ["AWS_ACCESS_KEY_ID"] = plugin_config["s3_access_key_id"]
Expand All @@ -29,3 +41,71 @@ def load(self, source_config: SourceConfig):
source_location = pathlib.Path(ext_location.strip("'"))
sheet_name = source_config.get("sheet_name", 0)
return pd.read_excel(source_location, sheet_name=sheet_name)

def store(self, target_config: TargetConfig):
plugin_output_config = self._config["output"]

# Create the writer on the first instance of the call to store.
# Instead if we instantiated the writer in the constructor
# with mode = 'w', this would result in an existing file getting
# overwritten. This can happen if dbt test is executed for example.
if not hasattr(self, "_excel_writer"):
with self._excel_writer_create_lock:
if not hasattr(self, "_excel_writer"):
self._excel_writer = pd.ExcelWriter(
plugin_output_config["file"],
mode=plugin_output_config.get("mode", "w"),
engine=plugin_output_config.get("engine", "xlsxwriter"),
engine_kwargs=plugin_output_config.get("engine_kwargs", {}),
date_format=plugin_output_config.get("date_format"),
datetime_format=plugin_output_config.get("datetime_format"),
)
if not plugin_output_config.get("header_styling", True):
excel.ExcelFormatter.header_style = None

target_output_config = {
**plugin_output_config,
**target_config.config.get("overrides", {}),
}

if "sheet_name" not in target_output_config:
# Excel sheet name is limited to 31 characters
sheet_name = (target_config.relation.identifier or "Sheet1")[0:31]
target_output_config["sheet_name"] = sheet_name

df = pd_utils.target_to_df(target_config)
if target_output_config.get("skip_empty_sheet", False) and df.shape[0] == 0:
return
try:
df.to_excel(
self._excel_writer,
sheet_name=target_output_config["sheet_name"],
na_rep=target_output_config.get("na_rep", ""),
float_format=target_output_config.get("float_format", None),
header=target_output_config.get("header", True),
index=target_output_config.get("index", True),
merge_cells=target_output_config.get("merge_cells", True),
inf_rep=target_output_config.get("inf_rep", "inf"),
)
if not target_output_config.get("lazy_close", True):
self._excel_writer.close()
del self._excel_writer
except ValueError as ve:
# Catches errors resembling the below & logs an appropriate message
# ValueError('This sheet is too large! Your sheet size is: 1100000, 1 Max sheet size is: 1048576, 16384')
if (
str(ve).startswith("This sheet is too large")
and target_output_config["ignore_sheet_too_large"]
):
pd.DataFrame(
[{"Error": target_output_config.get("ignore_sheet_too_large_error", str(ve))}]
).to_excel(
self._excel_writer, sheet_name=target_output_config["sheet_name"], index=False
)
else:
raise ve

def __del__(self):
if hasattr(self, "_excel_writer"):
logger.info(f"Closing {self._config['output']['file']}")
self._excel_writer.close()
86 changes: 86 additions & 0 deletions tests/functional/plugins/test_excel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import json
import pandas
import pytest

from dbt.tests.util import (
check_relations_equal,
run_dbt,
)

schema_yml = """
version: 2
sources:
- name: excel_source
schema: main
meta:
plugin: excel
tables:
- name: excel_file
description: "An excel file"
meta:
external_location: "{test_data_path}/excel_file.xlsx"
"""

plugins = [
{
"module": "excel",
"config": {
"output": {
"engine": "openpyxl",
"file": "/tmp/excel_file_out.xlsx",
"lazy_close": False
}
}
},
]

model_sql = """
{{ config(materialized='external', plugin='excel') }}
select * from {{ source('excel_source', 'excel_file') }}
"""


class TestExcelPlugin:
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": dbt_profile_target.get("path", ":memory:"),
"plugins": plugins,
}
},
"target": "dev",
}
}

@pytest.fixture(scope="class")
def models(self, test_data_path):
return {
"schema_excel.yml": schema_yml.format(test_data_path=test_data_path),
"excel_read_write.sql": model_sql,
}

def test_excel_plugin(self, project):
results = run_dbt()
assert len(results) == 1

res = project.run_sql("SELECT COUNT(1) FROM excel_file", fetch="one")
assert res[0] == 9

df = pandas.read_excel('/tmp/excel_file_out.xlsx')
assert df.shape[0] == 9
assert df['First Name'].iloc[0] == 'Dulce'

check_relations_equal(
project.adapter,
[
"excel_file",
"excel_read_write",
],
)


33 changes: 1 addition & 32 deletions tests/functional/plugins/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,6 @@
run_dbt,
)

excel_schema_yml = """
version: 2
sources:
- name: excel_source
schema: main
meta:
plugin: excel
tables:
- name: excel_file
description: "An excel file"
meta:
external_location: "{test_data_path}/excel_file.xlsx"
"""

sqlalchemy_schema_yml = """
version: 2
sources:
Expand All @@ -42,9 +28,6 @@
"""


excel1_sql = """
select * from {{ source('excel_source', 'excel_file') }}
"""
sqlalchemy1_sql = """
select * from {{ source('sql_source', 'tt1') }}
"""
Expand Down Expand Up @@ -97,7 +80,6 @@ def sqlite_test_db(self):
def profiles_config_update(self, dbt_profile_target, sqlite_test_db):
sa_config = {"connection_url": f"sqlite:///{sqlite_test_db}"}
plugins = [
{"module": "excel"},
{"module": "sqlalchemy", "alias": "sql", "config": sa_config},
{"module": "tests.create_function_plugin", "alias": "cfp"},
]
Expand All @@ -118,28 +100,15 @@ def profiles_config_update(self, dbt_profile_target, sqlite_test_db):
@pytest.fixture(scope="class")
def models(self, test_data_path):
return {
"schema_excel.yml": excel_schema_yml.format(test_data_path=test_data_path),
"schema_sqlalchemy.yml": sqlalchemy_schema_yml,
"excel.sql": excel1_sql,
"sqlalchemy1.sql": sqlalchemy1_sql,
"sqlalchemy2.sql": sqlalchemy2_sql,
"foo.sql": plugin_sql,
}

def test_plugins(self, project):
results = run_dbt()
assert len(results) == 4

res = project.run_sql("SELECT COUNT(1) FROM excel_file", fetch="one")
assert res[0] == 9

check_relations_equal(
project.adapter,
[
"excel_file",
"excel",
],
)
assert len(results) == 3

res = project.run_sql("SELECT COUNT(1) FROM tt1", fetch="one")
assert res[0] == 1
Expand Down

0 comments on commit 0d5ac9d

Please sign in to comment.