Skip to content

TypeError: data type 'decimal' not understood - using athena.to_iceberg() #3145

Open
@Rizxcviii

Description

@Rizxcviii

Describe the bug

When reading data from an SQS queue, we get iceberg data types from the glue data catalog, and using that data, perform some cleansing on our source data, and send the data to the target iceberg table in the data catalog. However, when doing this, we get the following error

ERROR] TypeError: data type 'decimal' not understood
Traceback (most recent call last):
  File "/var/task/index.py", line 249, in lambda_handler
    raise error
  File "/var/task/index.py", line 206, in lambda_handler
    wr.athena.to_iceberg(
  File "/opt/python/awswrangler/_config.py", line 712, in wrapper
    return function(**args)
  File "/opt/python/awswrangler/_utils.py", line 179, in inner
    return func(*args, **kwargs)
  File "/opt/python/awswrangler/athena/_write_iceberg.py", line 543, in to_iceberg
    df[col_name] = df[col_name].astype(_data_types.athena2pandas(col_type))
  File "/opt/python/pandas/core/generic.py", line 6643, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
  File "/opt/python/pandas/core/internals/managers.py", line 430, in astype
    return self.apply(
  File "/opt/python/pandas/core/internals/managers.py", line 363, in apply
    applied = getattr(b, f)(**kwargs)
  File "/opt/python/pandas/core/internals/blocks.py", line 758, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
  File "/opt/python/pandas/core/dtypes/astype.py", line 231, in astype_array_safe
    dtype = pandas_dtype(dtype)
  File "/opt/python/pandas/core/dtypes/common.py", line 1645, in pandas_dtype
    npdtype = np.dtype(dtype)

In "How to Reproduce", I comment the line numbers where the errors happen, however it does seem to happen during the write to the iceberg table.

How to Reproduce

import os
import json
from datetime import datetime
import logging
from typing import Dict, List
from dateutil import parser
import uuid
import time
import awswrangler as wr
import pandas as pd
import numpy as np

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class NoOpFoundException(KeyError):
    """
    Raised when the operation is not found
    """

    pass


class TableNotFoundException(KeyError):
    """
    Raised when the glue table is not found
    """

    pass


def get_dtypes(database_name: str, table_name: str) -> Dict[str, str]:
    """
    Retrieves the datatypes from a glue data catalog table

    :param database_name: The glue database that the table belongs to
    :param table_name: The glue table to retrieve the data types for

    :returns: If table exists, a dictionary like {'col name': 'col data type'}. Otherwise None.
    """
    start = time.perf_counter()
    dtypes = wr.catalog.get_table_types(
        database=database_name,
        table=table_name,
        filter_iceberg_current=True,
    )
    end = time.perf_counter()
    logger.info(f"Getting table types: {end - start:0.4f} seconds")

    return dtypes


def generate_df(rows: List[Dict], database_name: str, table_name: str) -> pd.DataFrame:
    """
    Generates a pandas Dataframe using the row input from upstream, for writing out to another glue table.

    :param rows: A list of dictionaries, each containing a row of data in the form (key, value)
    :param database_name: The glue database that the dataframe will write to
    :param table_name: The table that this dataframe will write out to

    :returns: A pandas Dataframe for writing to AWS Glue tables.
    """
    dtypes = get_dtypes(database_name=database_name, table_name=table_name)

    df = pd.DataFrame(rows)

    start = time.perf_counter()
    for column in df.columns:
        if dtypes[column] != "string":
            df[column] = df[column].replace("", np.nan, regex=True)
    end = time.perf_counter()
    logger.info(f"Replacing empty strings: {end - start:0.4f} seconds")

    return df


def lambda_handler(event, context):
    logging.info(f"Event: {event}")

    for record in event["Records"]:
        # Some other processing here...
        try:
            if len(rows_to_upsert) > 0: # ----------- Assume there are elements in this list
                upsert_start = time.perf_counter()

                df = generate_df(
                    rows=rows_to_upsert,
                    database_name=glue_database_name,
                    table_name=table_name,
                )

                start = time.perf_counter()
                df = df.drop_duplicates(subset=primary_keys, keep="last")
                end = time.perf_counter()
                logger.info(f"Dropping duplicates time: {end - start:0.4f} seconds")

                wr.athena.to_iceberg( # ---------------- This is line 206
                    df,
                    database=glue_database_name,
                    table=table_name,
                    workgroup=ATHENA_WORKGROUP_NAME,
                    keep_files=False,
                    partition_cols=["database", "year", "month", "day"],
                    merge_cols=primary_keys,
                    merge_condition="update",
                    temp_path=f"{TEMP_PATH}/{uuid.uuid4()}/",
                    dtype=get_dtypes(
                        database_name=glue_database_name, table_name=table_name
                    ),
                )

                upsert_end = time.perf_counter()
                logger.info(f"Upserting time: {upsert_end - upsert_start:0.4f} seconds")

        except Exception as error:
                # handle error
            raise error # -------------------- This is line 249

Expected behavior

The data should just merge in without this error

Your project

No response

Screenshots

No response

OS

Amazon Linux (running on AWS Lambda)

Python version

3.13

AWS SDK for pandas version

Lambda layer version ARN - arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python313-Arm64:1, arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python312-Arm64:16

Additional context

An example of the output of the get_dtypes, that is almost identical to the real world scenario we're facing is this:

{'col1': 'int', 'col2': 'string', 'col3': 'decimal(12,3)', 'col4': 'decimal(12,3)', 'col5': 'decimal(12,3)', 'col6': 'int', 'x': 'int', 'a': 'int', 'f': 'string', 'd': 'decimal(12,3)', 's': 'date', 'v': 'string', 'w': 'decimal(12,3)', 'g': 'decimal(12,3)', 'j': 'string', 'cc': 'date', 'dd': 'string', 'ss': 'timestamp', 'aa': 'string', 'zz': 'decimal(12,3)', 'vv': 'int', ..., 'database': 'string', 'year': 'string', 'month': 'string', 'day': 'string'}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions