Description
Describe the bug
When reading data from an SQS queue, we get iceberg data types from the glue data catalog, and using that data, perform some cleansing on our source data, and send the data to the target iceberg table in the data catalog. However, when doing this, we get the following error
ERROR] TypeError: data type 'decimal' not understood
Traceback (most recent call last):
File "/var/task/index.py", line 249, in lambda_handler
raise error
File "/var/task/index.py", line 206, in lambda_handler
wr.athena.to_iceberg(
File "/opt/python/awswrangler/_config.py", line 712, in wrapper
return function(**args)
File "/opt/python/awswrangler/_utils.py", line 179, in inner
return func(*args, **kwargs)
File "/opt/python/awswrangler/athena/_write_iceberg.py", line 543, in to_iceberg
df[col_name] = df[col_name].astype(_data_types.athena2pandas(col_type))
File "/opt/python/pandas/core/generic.py", line 6643, in astype
new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
File "/opt/python/pandas/core/internals/managers.py", line 430, in astype
return self.apply(
File "/opt/python/pandas/core/internals/managers.py", line 363, in apply
applied = getattr(b, f)(**kwargs)
File "/opt/python/pandas/core/internals/blocks.py", line 758, in astype
new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
File "/opt/python/pandas/core/dtypes/astype.py", line 231, in astype_array_safe
dtype = pandas_dtype(dtype)
File "/opt/python/pandas/core/dtypes/common.py", line 1645, in pandas_dtype
npdtype = np.dtype(dtype)
In "How to Reproduce", I comment the line numbers where the errors happen, however it does seem to happen during the write to the iceberg table.
How to Reproduce
import os
import json
from datetime import datetime
import logging
from typing import Dict, List
from dateutil import parser
import uuid
import time
import awswrangler as wr
import pandas as pd
import numpy as np
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class NoOpFoundException(KeyError):
"""
Raised when the operation is not found
"""
pass
class TableNotFoundException(KeyError):
"""
Raised when the glue table is not found
"""
pass
def get_dtypes(database_name: str, table_name: str) -> Dict[str, str]:
"""
Retrieves the datatypes from a glue data catalog table
:param database_name: The glue database that the table belongs to
:param table_name: The glue table to retrieve the data types for
:returns: If table exists, a dictionary like {'col name': 'col data type'}. Otherwise None.
"""
start = time.perf_counter()
dtypes = wr.catalog.get_table_types(
database=database_name,
table=table_name,
filter_iceberg_current=True,
)
end = time.perf_counter()
logger.info(f"Getting table types: {end - start:0.4f} seconds")
return dtypes
def generate_df(rows: List[Dict], database_name: str, table_name: str) -> pd.DataFrame:
"""
Generates a pandas Dataframe using the row input from upstream, for writing out to another glue table.
:param rows: A list of dictionaries, each containing a row of data in the form (key, value)
:param database_name: The glue database that the dataframe will write to
:param table_name: The table that this dataframe will write out to
:returns: A pandas Dataframe for writing to AWS Glue tables.
"""
dtypes = get_dtypes(database_name=database_name, table_name=table_name)
df = pd.DataFrame(rows)
start = time.perf_counter()
for column in df.columns:
if dtypes[column] != "string":
df[column] = df[column].replace("", np.nan, regex=True)
end = time.perf_counter()
logger.info(f"Replacing empty strings: {end - start:0.4f} seconds")
return df
def lambda_handler(event, context):
logging.info(f"Event: {event}")
for record in event["Records"]:
# Some other processing here...
try:
if len(rows_to_upsert) > 0: # ----------- Assume there are elements in this list
upsert_start = time.perf_counter()
df = generate_df(
rows=rows_to_upsert,
database_name=glue_database_name,
table_name=table_name,
)
start = time.perf_counter()
df = df.drop_duplicates(subset=primary_keys, keep="last")
end = time.perf_counter()
logger.info(f"Dropping duplicates time: {end - start:0.4f} seconds")
wr.athena.to_iceberg( # ---------------- This is line 206
df,
database=glue_database_name,
table=table_name,
workgroup=ATHENA_WORKGROUP_NAME,
keep_files=False,
partition_cols=["database", "year", "month", "day"],
merge_cols=primary_keys,
merge_condition="update",
temp_path=f"{TEMP_PATH}/{uuid.uuid4()}/",
dtype=get_dtypes(
database_name=glue_database_name, table_name=table_name
),
)
upsert_end = time.perf_counter()
logger.info(f"Upserting time: {upsert_end - upsert_start:0.4f} seconds")
except Exception as error:
# handle error
raise error # -------------------- This is line 249
Expected behavior
The data should just merge in without this error
Your project
No response
Screenshots
No response
OS
Amazon Linux (running on AWS Lambda)
Python version
3.13
AWS SDK for pandas version
Lambda layer version ARN - arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python313-Arm64:1, arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python312-Arm64:16
Additional context
An example of the output of the get_dtypes
, that is almost identical to the real world scenario we're facing is this:
{'col1': 'int', 'col2': 'string', 'col3': 'decimal(12,3)', 'col4': 'decimal(12,3)', 'col5': 'decimal(12,3)', 'col6': 'int', 'x': 'int', 'a': 'int', 'f': 'string', 'd': 'decimal(12,3)', 's': 'date', 'v': 'string', 'w': 'decimal(12,3)', 'g': 'decimal(12,3)', 'j': 'string', 'cc': 'date', 'dd': 'string', 'ss': 'timestamp', 'aa': 'string', 'zz': 'decimal(12,3)', 'vv': 'int', ..., 'database': 'string', 'year': 'string', 'month': 'string', 'day': 'string'}