Skip to content

Commit

Permalink
allow setting Logical types for pqarrow file writer
Browse files Browse the repository at this point in the history
  • Loading branch information
datbth committed Aug 12, 2024
1 parent 7790c4c commit f14cea7
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
6 changes: 5 additions & 1 deletion go/parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ type FileWriter struct {
// the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing
// file.Writer, this will create a new file.Writer based on the schema provided.
func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) {
return NewFileWriterWithLogicalTypes(arrschema, w, props, arrprops, nil)
}

func NewFileWriterWithLogicalTypes(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*FileWriter, error) {
if props == nil {
props = parquet.NewWriterProperties()
}

pqschema, err := ToParquet(arrschema, props, arrprops)
pqschema, err := ToParquetWithLogicalTypes(arrschema, props, arrprops, logicalTypes)
if err != nil {
return nil, err
}
Expand Down
56 changes: 56 additions & 0 deletions go/parquet/pqarrow/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/memory"
"github.com/apache/arrow/go/v18/parquet"
"github.com/apache/arrow/go/v18/parquet/file"
"github.com/apache/arrow/go/v18/parquet/internal/encoding"
"github.com/apache/arrow/go/v18/parquet/pqarrow"
pqschema "github.com/apache/arrow/go/v18/parquet/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -133,3 +136,56 @@ func TestFileWriterBuffered(t *testing.T) {
require.NoError(t, writer.Close())
assert.Equal(t, 4, writer.NumRows())
}

func TestFileWriterWithLogicalTypes(t *testing.T) {
schema := arrow.NewSchema([]arrow.Field{
{Name: "string", Nullable: true, Type: arrow.BinaryTypes.String},
{Name: "json", Nullable: true, Type: arrow.BinaryTypes.String},
}, nil)

data := `[
{ "string": "{\"key\":\"value\"}", "json": "{\"key\":\"value\"}" },
{ "string": null, "json": null }
]`

logicalTypes := []*pqarrow.LogicalType{
nil,
{ Type: pqschema.JSONLogicalType{}, Length: -1 },
}

alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

record, _, err := array.RecordFromJSON(alloc, schema, strings.NewReader(data))
require.NoError(t, err)
defer record.Release()

mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()

writer, err := pqarrow.NewFileWriterWithLogicalTypes(
schema,
sink,
parquet.NewWriterProperties(
parquet.WithAllocator(alloc),
),
pqarrow.NewArrowWriterProperties(
pqarrow.WithAllocator(alloc),
),
logicalTypes,
)
require.NoError(t, err)

require.NoError(t, writer.Write(record))
require.NoError(t, writer.Close())

reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
require.NoError(t, err)
assert.EqualValues(t, 2, reader.NumRows())

parquetSchema := reader.MetaData().Schema
assert.EqualValues(t, "String", parquetSchema.Column(0).LogicalType().String())
assert.EqualValues(t, "JSON", parquetSchema.Column(1).LogicalType().String())
}
15 changes: 15 additions & 0 deletions go/parquet/pqarrow/logical_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pqarrow

import "github.com/apache/arrow/go/v18/parquet/schema"

type LogicalType struct {
Type schema.LogicalType
Length int
}

func NewLogicalType() *LogicalType {
return &LogicalType{
Type: schema.NoLogicalType{},
Length: -1,
}
}
31 changes: 22 additions & 9 deletions go/parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func structToNode(typ *arrow.StructType, name string, nullable bool, props *parq

children := make(schema.FieldList, 0, typ.NumFields())
for _, f := range typ.Fields() {
n, err := fieldToNode(f.Name, f, props, arrprops)
n, err := fieldToNode(f.Name, f, props, arrprops, nil)
if err != nil {
return nil, err
}
Expand All @@ -249,7 +249,7 @@ func structToNode(typ *arrow.StructType, name string, nullable bool, props *parq
return schema.NewGroupNode(name, repFromNullable(nullable), children, -1)
}

func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (schema.Node, error) {
func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties, customLogicalType *LogicalType) (schema.Node, error) {
var (
logicalType schema.LogicalType = schema.NoLogicalType{}
typ parquet.Type
Expand Down Expand Up @@ -362,7 +362,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties
elem = field.Type.(*arrow.FixedSizeListType).Elem()
}

child, err := fieldToNode(name, arrow.Field{Name: name, Type: elem, Nullable: true}, props, arrprops)
child, err := fieldToNode(name, arrow.Field{Name: name, Type: elem, Nullable: true}, props, arrprops, nil)
if err != nil {
return nil, err
}
Expand All @@ -372,7 +372,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties
// parquet has no dictionary type, dictionary is encoding, not schema level
dictType := field.Type.(*arrow.DictionaryType)
return fieldToNode(name, arrow.Field{Name: name, Type: dictType.ValueType, Nullable: field.Nullable, Metadata: field.Metadata},
props, arrprops)
props, arrprops, customLogicalType)
case arrow.EXTENSION:
return fieldToNode(name, arrow.Field{
Name: name,
Expand All @@ -382,15 +382,15 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties
ipc.ExtensionTypeKeyName: field.Type.(arrow.ExtensionType).ExtensionName(),
ipc.ExtensionMetadataKeyName: field.Type.(arrow.ExtensionType).Serialize(),
}),
}, props, arrprops)
}, props, arrprops, customLogicalType)
case arrow.MAP:
mapType := field.Type.(*arrow.MapType)
keyNode, err := fieldToNode("key", mapType.KeyField(), props, arrprops)
keyNode, err := fieldToNode("key", mapType.KeyField(), props, arrprops, nil)
if err != nil {
return nil, err
}

valueNode, err := fieldToNode("value", mapType.ItemField(), props, arrprops)
valueNode, err := fieldToNode("value", mapType.ItemField(), props, arrprops, nil)
if err != nil {
return nil, err
}
Expand All @@ -410,6 +410,11 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties
return nil, fmt.Errorf("%w: support for %s", arrow.ErrNotImplemented, field.Type.ID())
}

if customLogicalType != nil {
logicalType = customLogicalType.Type
length = customLogicalType.Length
}

return schema.NewPrimitiveNodeLogical(name, repType, logicalType, typ, length, fieldIDFromMeta(field.Metadata))
}

Expand Down Expand Up @@ -440,13 +445,21 @@ func fieldIDFromMeta(m arrow.Metadata) int32 {
// ToParquet generates a Parquet Schema from an arrow Schema using the given properties to make
// decisions when determining the logical/physical types of the columns.
func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*schema.Schema, error) {
return ToParquetWithLogicalTypes(sc, props, arrprops, nil)
}

func ToParquetWithLogicalTypes(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*schema.Schema, error) {
if props == nil {
props = parquet.NewWriterProperties()
}

nodes := make(schema.FieldList, 0, sc.NumFields())
for _, f := range sc.Fields() {
n, err := fieldToNode(f.Name, f, props, arrprops)
for i, f := range sc.Fields() {
var logicalType *LogicalType
if logicalTypes != nil && i < len(logicalTypes) {
logicalType = logicalTypes[i]
}
n, err := fieldToNode(f.Name, f, props, arrprops, logicalType)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f14cea7

Please sign in to comment.