Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump actions/download-artifact from 3 to 4.1.7 in /.github/workflows #2

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ jobs:
echo "$HOME/.local/bin" >> $GITHUB_PATH
- run: mkdir r/windows
- name: Download artifacts
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4.1.7
with:
name: libarrow-rtools40-ucrt64.zip
path: r/windows
Expand Down
1 change: 1 addition & 0 deletions go/arrow/array/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ func NewMonthDayNanoIntervalData(data arrow.ArrayData) *MonthDayNanoInterval {
return a
}

func (a *MonthDayNanoInterval) Values() []arrow.MonthDayNanoInterval { return a.values }
func (a *MonthDayNanoInterval) Value(i int) arrow.MonthDayNanoInterval { return a.values[i] }
func (a *MonthDayNanoInterval) ValueStr(i int) string {
if a.IsNull(i) {
Expand Down
39 changes: 39 additions & 0 deletions go/parquet/internal/testutils/random_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,19 @@ func RandomNonNull(mem memory.Allocator, dt arrow.DataType, size int) arrow.Arra
FillRandomBooleans(0.5, 0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.INTERVAL_MONTH_DAY_NANO:
bldr := array.NewMonthDayNanoIntervalBuilder(memory.DefaultAllocator)
defer bldr.Release()

for i := 0; i < size; i++ {
bldr.Append(arrow.MonthDayNanoInterval{
Months: rand.Int31(),
Days: rand.Int31(),
// parquet interval supports 32-bit milliseconds only -> only generate at 32-bit-millisecond precision
Nanoseconds: int64(rand.Int31()) * 1000000,
})
}
return bldr.NewArray()
}
return nil
}
Expand Down Expand Up @@ -513,6 +526,32 @@ func RandomNullable(dt arrow.DataType, size int, numNulls int) arrow.Array {
FillRandomBooleans(0.5, 0, values)
bldr.AppendValues(values, valid)
return bldr.NewArray()
case arrow.INTERVAL_MONTH_DAY_NANO:
bldr := array.NewMonthDayNanoIntervalBuilder(memory.DefaultAllocator)
defer bldr.Release()

valid := make([]bool, size)
for idx := range valid {
valid[idx] = true
}
for i := 0; i < numNulls; i++ {
valid[i*2] = false
}

for i := 0; i < size; i++ {
if !valid[i] {
bldr.AppendNull()
continue
}

bldr.Append(arrow.MonthDayNanoInterval{
Months: rand.Int31(),
Days: rand.Int31(),
// parquet interval supports 32-bit milliseconds only -> only generate at 32-bit-millisecond precision
Nanoseconds: int64(rand.Int31()) * 1000000,
})
}
return bldr.NewArray()
}
return nil
}
43 changes: 43 additions & 0 deletions go/parquet/pqarrow/column_readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/apache/arrow/go/v18/arrow/bitutil"
"github.com/apache/arrow/go/v18/arrow/decimal128"
"github.com/apache/arrow/go/v18/arrow/decimal256"
"github.com/apache/arrow/go/v18/arrow/endian"
"github.com/apache/arrow/go/v18/arrow/memory"
"github.com/apache/arrow/go/v18/internal/utils"
"github.com/apache/arrow/go/v18/parquet"
Expand Down Expand Up @@ -525,6 +526,8 @@ func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr *
return nil, fmt.Errorf("fixed len byte array length for float16 must be %d", len)
}
return transferBinary(rdr, valueType), nil
case arrow.INTERVAL_MONTH_DAY_NANO:
return transferInterval(rdr, valueType)
default:
return nil, fmt.Errorf("no support for reading columns of type: %s", valueType.Name())
}
Expand Down Expand Up @@ -967,3 +970,43 @@ func transferDictionary(rdr file.RecordReader, logicalValueType arrow.DataType)
defer releaseArrays(chunks)
return arrow.NewChunked(logicalValueType, chunks)
}

func transferInterval(rdr file.RecordReader, dt arrow.DataType) (*arrow.Chunked, error) {
convert := func (in arrow.Array) (arrow.Array, error) {
length := in.Len()
nullCount := in.NullN()
data := make([]byte, arrow.Int32Traits.BytesRequired(length * 4))

for i := 0; i < length; i++ {
if (in.IsNull(i)) {
continue
}

rec := in.(*array.FixedSizeBinary).Value(i)
if len(rec) != 12 {
return nil, fmt.Errorf("interval type must contain exactly 12 bytes")
}
copy(data[i*16 : i*16+4], rec[:4])
copy(data[i*16+4 : i*16+8], rec[4:8])
endian.Native.PutUint64(data[i*16+8 : i*16+16], uint64(binary.LittleEndian.Uint32(rec[8:])) * 1000000)
}

ret := array.NewData(dt, length, []*memory.Buffer{
in.Data().Buffers()[0], memory.NewBufferBytes(data),
}, nil, nullCount, 0)
defer ret.Release()
return array.MakeFromData(ret), nil
}

chunks := rdr.(file.BinaryRecordReader).GetBuilderChunks()
var err error
for idx, chunk := range chunks {
defer chunk.Release()
if chunks[idx], err = convert(chunk); err != nil {
return nil, err
}
defer chunks[idx].Release()
}

return arrow.NewChunked(dt, chunks), nil
}
24 changes: 24 additions & 0 deletions go/parquet/pqarrow/encode_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,30 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr
}
wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
case *arrow.MonthDayNanoIntervalType:
arr := leafArr.(*array.MonthDayNanoInterval)
data := make([]parquet.FixedLenByteArray, arr.Len())

// NOTE: parquet Interval only supports 32-bit milliseconds so we are truncating Nanoseconds to milliseconds
if arr.NullN() == 0 {
for idx := range data {
data[idx] = make(parquet.FixedLenByteArray, 12)
binary.LittleEndian.PutUint32(data[idx][:4], uint32(arr.Value(idx).Months))
binary.LittleEndian.PutUint32(data[idx][4:8], uint32(arr.Value(idx).Days))
binary.LittleEndian.PutUint32(data[idx][8:], uint32(arr.Value(idx).Nanoseconds / 1000000))
}
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
for idx := range data {
if arr.IsValid(idx) {
data[idx] = make(parquet.FixedLenByteArray, 12)
binary.LittleEndian.PutUint32(data[idx][:4], uint32(arr.Value(idx).Months))
binary.LittleEndian.PutUint32(data[idx][4:8], uint32(arr.Value(idx).Days))
binary.LittleEndian.PutUint32(data[idx][8:], uint32(arr.Value(idx).Nanoseconds / 1000000))
}
}
wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
default:
return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
}
Expand Down
21 changes: 20 additions & 1 deletion go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ func getLogicalType(typ arrow.DataType) schema.LogicalType {
case arrow.DECIMAL, arrow.DECIMAL256:
dec := typ.(arrow.DecimalType)
return schema.NewDecimalLogicalType(dec.GetPrecision(), dec.GetScale())
case arrow.INTERVAL_MONTH_DAY_NANO:
return schema.IntervalLogicalType{}
}
return schema.NoLogicalType{}
}
Expand Down Expand Up @@ -687,6 +689,8 @@ func getPhysicalType(typ arrow.DataType) parquet.Type {
return parquet.Types.Int32
case arrow.TIME64, arrow.TIMESTAMP:
return parquet.Types.Int64
case arrow.INTERVAL_MONTH_DAY_NANO:
return parquet.Types.FixedLenByteArray
default:
return parquet.Types.Int32
}
Expand All @@ -710,6 +714,9 @@ const (

smallSize = 100
)
var (
intvlTestVal = arrow.MonthDayNanoInterval { Months: 23, Days: 11, Nanoseconds: 123000000 }
)

type ParquetIOTestSuite struct {
suite.Suite
Expand All @@ -735,6 +742,8 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.R
byteWidth = pqarrow.DecimalSize(typ.GetPrecision())
case *arrow.Float16Type:
byteWidth = int32(typ.Bytes())
case *arrow.MonthDayNanoIntervalType:
byteWidth = 12
case *arrow.DictionaryType:
valuesType := typ.ValueType
switch dt := valuesType.(type) {
Expand All @@ -747,7 +756,8 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.R
}
}

pnode, _ := schema.NewPrimitiveNodeLogical("column1", rep, getLogicalType(typ), getPhysicalType(typ), int(byteWidth), -1)
pnode, err := schema.NewPrimitiveNodeLogical("column1", rep, getLogicalType(typ), getPhysicalType(typ), int(byteWidth), -1)
ps.NoError(err)
return schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{pnode}, -1))
}

Expand Down Expand Up @@ -830,6 +840,13 @@ func (ps *ParquetIOTestSuite) makePrimitiveTestCol(mem memory.Allocator, size in
bldr.Append(doubleTestVal)
}
return bldr.NewArray()
case arrow.INTERVAL_MONTH_DAY_NANO:
bldr := array.NewMonthDayNanoIntervalBuilder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(intvlTestVal)
}
return bldr.NewArray()
}
return nil
}
Expand Down Expand Up @@ -1122,6 +1139,7 @@ func (ps *ParquetIOTestSuite) TestSingleColumnRequiredRead() {
arrow.PrimitiveTypes.Int64,
arrow.PrimitiveTypes.Float32,
arrow.PrimitiveTypes.Float64,
arrow.FixedWidthTypes.MonthDayNanoInterval,
}

nchunks := []int{1, 4}
Expand Down Expand Up @@ -1334,6 +1352,7 @@ var fullTypeList = []arrow.DataType{
&arrow.Decimal128Type{Precision: 23, Scale: 22},
&arrow.Decimal128Type{Precision: 27, Scale: 26},
&arrow.Decimal128Type{Precision: 38, Scale: 37},
&arrow.MonthDayNanoIntervalType{},
}

func (ps *ParquetIOTestSuite) TestSingleColumnRequiredWrite() {
Expand Down
4 changes: 2 additions & 2 deletions go/parquet/pqarrow/encode_dictionary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import (

func (ps *ParquetIOTestSuite) TestSingleColumnOptionalDictionaryWrite() {
for _, dt := range fullTypeList {
// skip tests for bool as we don't do dictionaries for it
if dt.ID() == arrow.BOOL {
// skip tests for bool and interval as we don't do dictionaries for them
if dt.ID() == arrow.BOOL || dt.ID() == arrow.INTERVAL_MONTH_DAY_NANO {
continue
}

Expand Down
6 changes: 5 additions & 1 deletion go/parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ type FileWriter struct {
// the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing
// file.Writer, this will create a new file.Writer based on the schema provided.
func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) {
return NewFileWriterWithLogicalTypes(arrschema, w, props, arrprops, nil)
}

func NewFileWriterWithLogicalTypes(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*FileWriter, error) {
if props == nil {
props = parquet.NewWriterProperties()
}

pqschema, err := ToParquet(arrschema, props, arrprops)
pqschema, err := ToParquetWithLogicalTypes(arrschema, props, arrprops, logicalTypes)
if err != nil {
return nil, err
}
Expand Down
56 changes: 56 additions & 0 deletions go/parquet/pqarrow/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/memory"
"github.com/apache/arrow/go/v18/parquet"
"github.com/apache/arrow/go/v18/parquet/file"
"github.com/apache/arrow/go/v18/parquet/internal/encoding"
"github.com/apache/arrow/go/v18/parquet/pqarrow"
pqschema "github.com/apache/arrow/go/v18/parquet/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -133,3 +136,56 @@ func TestFileWriterBuffered(t *testing.T) {
require.NoError(t, writer.Close())
assert.Equal(t, 4, writer.NumRows())
}

func TestFileWriterWithLogicalTypes(t *testing.T) {
schema := arrow.NewSchema([]arrow.Field{
{Name: "string", Nullable: true, Type: arrow.BinaryTypes.String},
{Name: "json", Nullable: true, Type: arrow.BinaryTypes.String},
}, nil)

data := `[
{ "string": "{\"key\":\"value\"}", "json": "{\"key\":\"value\"}" },
{ "string": null, "json": null }
]`

logicalTypes := []*pqarrow.LogicalType{
nil,
{ Type: pqschema.JSONLogicalType{}, Length: -1 },
}

alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

record, _, err := array.RecordFromJSON(alloc, schema, strings.NewReader(data))
require.NoError(t, err)
defer record.Release()

mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()

writer, err := pqarrow.NewFileWriterWithLogicalTypes(
schema,
sink,
parquet.NewWriterProperties(
parquet.WithAllocator(alloc),
),
pqarrow.NewArrowWriterProperties(
pqarrow.WithAllocator(alloc),
),
logicalTypes,
)
require.NoError(t, err)

require.NoError(t, writer.Write(record))
require.NoError(t, writer.Close())

reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
require.NoError(t, err)
assert.EqualValues(t, 2, reader.NumRows())

parquetSchema := reader.MetaData().Schema
assert.EqualValues(t, "String", parquetSchema.Column(0).LogicalType().String())
assert.EqualValues(t, "JSON", parquetSchema.Column(1).LogicalType().String())
}
15 changes: 15 additions & 0 deletions go/parquet/pqarrow/logical_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pqarrow

import "github.com/apache/arrow/go/v18/parquet/schema"

type LogicalType struct {
Type schema.LogicalType
Length int
}

func NewLogicalType() *LogicalType {
return &LogicalType{
Type: schema.NoLogicalType{},
Length: -1,
}
}
Loading
Loading