Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read_parquet not picking up column names from partitioned dataset #154

Open
mahiki opened this issue Jul 9, 2021 · 5 comments
Open

read_parquet not picking up column names from partitioned dataset #154

mahiki opened this issue Jul 9, 2021 · 5 comments

Comments

@mahiki
Copy link

mahiki commented Jul 9, 2021

I'm a package user, not a developer so I apologize in advance if I am missing something obvious in the API descriptions.

My use case:
I'm consuming output of some data engineers to produce aggregated reports using Julia, my preferred language.

I have datasets that are written from Spark in parquet format, in the typical fashion of partitioning by certain columns. File layout below. Original complete dataset includes partitition columns dataset_date and device_family. The first being the date of observations, allowing incremental build of historical data, and the second is a natural partition of the data because of upstream logic.

Here are parquet file and Dataframe load operations that are not detecting the partitions.

Julia Version 1.6.1
Parquet v0.8.3
DataFrames v1.2.0

parquet files written by Spark:
Spark version 2.4.4
Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)

shell> tree 
datasets
└── parquet
    └── jobs
        ├── d_table
        │   ├── dataset_date=2021-06-30
        │   │   ├── device_family=ABC
        │   │   │   └── part-00000-6655178e-0bbb-4741-ab6a-8efe6aebbded.c000.snappy.parquet
        │   │   ├── device_family=DEF
        │   │   │   └── part-00000-358d8620-c4e3-44cb-89d5-d00eb23ad52b.c000.snappy.parquet
        │   │   └── device_family=GHI
        │   │       └── part-00000-c10db044-3b38-4032-ad22-aab3edb8e1ba.c000.snappy.parquet
        │   └── dataset_date=2021-06-30_$folder$
        └── d_table_$folder$

datapath = "datasets/parquet/jobs/d_table/dataset_date=2021-06-30/device_family=ABC/part-00000-358d8620-c4e3-44cb-89d5-d00eb23ad52b.c000.snappy.parquet"

dataset_root = "datasets/parquet/jobs/d_table"

parquetfile = Parquet.File(datapath)
Parquet file: datasets/parquet/jobs/d_table/dataset_date=2021-06-30/device_family=ABC/part-00000-358d8620-c4e3-44cb-89d5-d00eb23ad52b.c000.snappy.parquet
    version: 1
    nrows: 19739
    created by: parquet-mr version 1.10.1 (build ae1ebff313617fb6ec367d105aeaf306bf27939c)
    cached: 0 column chunks

# Columns 'dataset_date' and  'device_family' is missing
colnames(parquetfile)
14-element Vector{Vector{String}}:
 ["country_cd"]
 ["month_end"]
 ["agg_level"]
 ["vert"]
 ["ten_group"]
 ["total_dialog"]
 ["users"]
 ["dialogs"]
 ["actives"]
 ["regist_cnt"]
 ["dia_user_rate"]
 ["act_pct"]
 ["penpct"]
 ["totaldia_userrate"]

# same for dataframe
df = DataFrame(read_parquet(dataset_root)

55919×14 DataFrame      # <-- note the number of rows includes data from all three partitions {ABC, DEF, GHI}
                                          #       but the partition columns are not present

propertynames(pdf)
14-element Vector{Symbol}:
:country_cd
:month_end
:agg_level
:vert
:ten_group
:total_dialog
:users
:dialogs
:actives
:regist_cnt
:dia_user_rate
:act_pct
:penpct
:totaldia_userrate
# same names, missing the partition columns "dataset_day" and "device_family"

Expected behavior is same result when reading from spark like the following, note the two last columns reflect the file paths.

val df = spark.read.parquet(dataset_root)

df.show(5)
+----------+-------------+---------+----------- ... -----------+------------+-------------+
|country_cd|month_end    |agg_level|     vert   ... totaldi    |dataset_date|device_family|
+----------+-------------+---------+----------- ... -----------+------------+-------------+
|        AU|   2021-01-31|  MONTHLY| XYZabc1234 ...   30.209284|  2021-06-30|          ABC|
|        BR|   2020-12-31|  MONTHLY| XYZabc1234 ...    73.46783|  2021-05-31|          DEF|
|        BR|   2020-02-29|  MONTHLY| XYZabc1234 ...   21.850622|  2021-05-31|          DEF|
|        BR|   2020-05-31|  MONTHLY| XYZabc1234 ...    377.8711|  2021-06-30|          GHI|
|        DE|   2020-08-31|  MONTHLY| XYZabc1234 ...   71.680115|  2021-06-30|          ABC|
+----------+-------------+---------+----------- ... -----------+------------+-------------+
@mahiki
Copy link
Author

mahiki commented Jul 11, 2021

Related to:
#139
#138

The docstring for read_parquet says the following for column_generator kwarg, so I think this does not need to be set:

column_generator: Function to generate a partitioned column when not found in the partitioned table. Parameters provided to the function: table, column index, length of column to generate. Default implementation determines column values from the table path

(Emphasis mine) I'm pretty sure the intention is for read_parquet to build column names from partitions in just the same way spark does, which would be great 😊. Must just be a bug in implementation, perhaps related to the change above about metadata files.

@mahiki
Copy link
Author

mahiki commented Jul 20, 2021

This is a helpful reference about spark partition discovery, from @tk3369
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery

@mahiki
Copy link
Author

mahiki commented Jul 21, 2021

There is type inference from column path, this from the spark repo doc sql-data-sources-parquet.md:

Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types, date, timestamp and string type are supported.

Pretty sure parquet files can encode metadata into the files, and maybe column type is stored in metadata for some parquet writer applications, but I think for spark the column names come from the path and the type is inferred.

If Parquet.jl is inferring column types the way CSV.jl does I think that would be a fine way to solve the problem, matching what spark provides for

# types inferred for columns
numeric data types
date
timestamp
string

@tanmaykm
Copy link
Member

read_parquet depends on the table metadata being available in a file named _common_metadata or _metadata in the dataset folder. If that is not available, then it assumes that each of the partitioned parquet files has the complete metadata.

I do not have much experience with how Spark does it, but it does seem possible: https://arrow.apache.org/docs/python/parquet.html#writing-metadata-and-common-medata-files

Does the dataset you are consuming have the metadata files?

@mahiki
Copy link
Author

mahiki commented Oct 18, 2021

Hi -- sorry for the long lapse.

Generally there are no files _metadata or _common_metadata, I think in Spark this is a controllable configuration but the typical data lakes I'm working with do not have these. I work with many data sets produced by other teams, so it seems in order to incorporate Julia processing steps I will need to pre-process these files again in spark or python.

I know this is less of a burden in the data science area, where it is expected to pre-process data before ingesting to models or stats work. I would like to use Julia in various data transformations as part of the data engineering components of the work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants