diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go index 539c544829e3b..241ded6753ea3 100644 --- a/go/parquet/pqarrow/file_writer.go +++ b/go/parquet/pqarrow/file_writer.go @@ -63,11 +63,15 @@ type FileWriter struct { // the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing // file.Writer, this will create a new file.Writer based on the schema provided. func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) { + return NewFileWriterWithLogicalTypes(arrschema, w, props, arrprops, nil) +} + +func NewFileWriterWithLogicalTypes(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*FileWriter, error) { if props == nil { props = parquet.NewWriterProperties() } - pqschema, err := ToParquet(arrschema, props, arrprops) + pqschema, err := ToParquetWithLogicalTypes(arrschema, props, arrprops, logicalTypes) if err != nil { return nil, err } diff --git a/go/parquet/pqarrow/file_writer_test.go b/go/parquet/pqarrow/file_writer_test.go index 5b807389a3eb1..31dd4cab6c1cc 100644 --- a/go/parquet/pqarrow/file_writer_test.go +++ b/go/parquet/pqarrow/file_writer_test.go @@ -26,7 +26,10 @@ import ( "github.com/apache/arrow/go/v18/arrow/array" "github.com/apache/arrow/go/v18/arrow/memory" "github.com/apache/arrow/go/v18/parquet" + "github.com/apache/arrow/go/v18/parquet/file" + "github.com/apache/arrow/go/v18/parquet/internal/encoding" "github.com/apache/arrow/go/v18/parquet/pqarrow" + pqschema "github.com/apache/arrow/go/v18/parquet/schema" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -133,3 +136,56 @@ func TestFileWriterBuffered(t *testing.T) { require.NoError(t, writer.Close()) assert.Equal(t, 4, writer.NumRows()) } + +func TestFileWriterWithLogicalTypes(t *testing.T) { + schema := arrow.NewSchema([]arrow.Field{ + {Name: "string", Nullable: true, Type: arrow.BinaryTypes.String}, + {Name: "json", Nullable: true, Type: arrow.BinaryTypes.String}, + }, nil) + + data := `[ + { "string": "{\"key\":\"value\"}", "json": "{\"key\":\"value\"}" }, + { "string": null, "json": null } + ]` + + logicalTypes := []*pqarrow.LogicalType{ + nil, + { Type: pqschema.JSONLogicalType{}, Length: -1 }, + } + + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + + record, _, err := array.RecordFromJSON(alloc, schema, strings.NewReader(data)) + require.NoError(t, err) + defer record.Release() + + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + sink := encoding.NewBufferWriter(0, mem) + defer sink.Release() + + writer, err := pqarrow.NewFileWriterWithLogicalTypes( + schema, + sink, + parquet.NewWriterProperties( + parquet.WithAllocator(alloc), + ), + pqarrow.NewArrowWriterProperties( + pqarrow.WithAllocator(alloc), + ), + logicalTypes, + ) + require.NoError(t, err) + + require.NoError(t, writer.Write(record)) + require.NoError(t, writer.Close()) + + reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) + require.NoError(t, err) + assert.EqualValues(t, 2, reader.NumRows()) + + parquetSchema := reader.MetaData().Schema + assert.EqualValues(t, "String", parquetSchema.Column(0).LogicalType().String()) + assert.EqualValues(t, "JSON", parquetSchema.Column(1).LogicalType().String()) +} \ No newline at end of file diff --git a/go/parquet/pqarrow/logical_type.go b/go/parquet/pqarrow/logical_type.go new file mode 100644 index 0000000000000..7043a7b3e6860 --- /dev/null +++ b/go/parquet/pqarrow/logical_type.go @@ -0,0 +1,15 @@ +package pqarrow + +import "github.com/apache/arrow/go/v18/parquet/schema" + +type LogicalType struct { + Type schema.LogicalType + Length int +} + +func NewLogicalType() *LogicalType { + return &LogicalType{ + Type: schema.NoLogicalType{}, + Length: -1, + } +} diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index 4ec67a18f2a65..f8707d0e919be 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -239,7 +239,7 @@ func structToNode(typ *arrow.StructType, name string, nullable bool, props *parq children := make(schema.FieldList, 0, typ.NumFields()) for _, f := range typ.Fields() { - n, err := fieldToNode(f.Name, f, props, arrprops) + n, err := fieldToNode(f.Name, f, props, arrprops, nil) if err != nil { return nil, err } @@ -249,7 +249,7 @@ func structToNode(typ *arrow.StructType, name string, nullable bool, props *parq return schema.NewGroupNode(name, repFromNullable(nullable), children, -1) } -func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (schema.Node, error) { +func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties, customLogicalType *LogicalType) (schema.Node, error) { var ( logicalType schema.LogicalType = schema.NoLogicalType{} typ parquet.Type @@ -362,7 +362,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties elem = field.Type.(*arrow.FixedSizeListType).Elem() } - child, err := fieldToNode(name, arrow.Field{Name: name, Type: elem, Nullable: true}, props, arrprops) + child, err := fieldToNode(name, arrow.Field{Name: name, Type: elem, Nullable: true}, props, arrprops, nil) if err != nil { return nil, err } @@ -372,7 +372,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties // parquet has no dictionary type, dictionary is encoding, not schema level dictType := field.Type.(*arrow.DictionaryType) return fieldToNode(name, arrow.Field{Name: name, Type: dictType.ValueType, Nullable: field.Nullable, Metadata: field.Metadata}, - props, arrprops) + props, arrprops, customLogicalType) case arrow.EXTENSION: return fieldToNode(name, arrow.Field{ Name: name, @@ -382,15 +382,15 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties ipc.ExtensionTypeKeyName: field.Type.(arrow.ExtensionType).ExtensionName(), ipc.ExtensionMetadataKeyName: field.Type.(arrow.ExtensionType).Serialize(), }), - }, props, arrprops) + }, props, arrprops, customLogicalType) case arrow.MAP: mapType := field.Type.(*arrow.MapType) - keyNode, err := fieldToNode("key", mapType.KeyField(), props, arrprops) + keyNode, err := fieldToNode("key", mapType.KeyField(), props, arrprops, nil) if err != nil { return nil, err } - valueNode, err := fieldToNode("value", mapType.ItemField(), props, arrprops) + valueNode, err := fieldToNode("value", mapType.ItemField(), props, arrprops, nil) if err != nil { return nil, err } @@ -410,6 +410,11 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties return nil, fmt.Errorf("%w: support for %s", arrow.ErrNotImplemented, field.Type.ID()) } + if customLogicalType != nil { + logicalType = customLogicalType.Type + length = customLogicalType.Length + } + return schema.NewPrimitiveNodeLogical(name, repType, logicalType, typ, length, fieldIDFromMeta(field.Metadata)) } @@ -440,13 +445,21 @@ func fieldIDFromMeta(m arrow.Metadata) int32 { // ToParquet generates a Parquet Schema from an arrow Schema using the given properties to make // decisions when determining the logical/physical types of the columns. func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*schema.Schema, error) { + return ToParquetWithLogicalTypes(sc, props, arrprops, nil) +} + +func ToParquetWithLogicalTypes(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*schema.Schema, error) { if props == nil { props = parquet.NewWriterProperties() } nodes := make(schema.FieldList, 0, sc.NumFields()) - for _, f := range sc.Fields() { - n, err := fieldToNode(f.Name, f, props, arrprops) + for i, f := range sc.Fields() { + var logicalType *LogicalType + if logicalTypes != nil && i < len(logicalTypes) { + logicalType = logicalTypes[i] + } + n, err := fieldToNode(f.Name, f, props, arrprops, logicalType) if err != nil { return nil, err }