Skip to content

Commit dba1566

Browse files
authored
fb-1915 Support large id's in NDJSON (FeatureBaseDB#2290)
* uses json.Decoder to allow for large integer values in ndjson format in bulk import
1 parent 7b8b3d8 commit dba1566

File tree

5 files changed

+99
-84
lines changed

5 files changed

+99
-84
lines changed

arrow.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ func readTableArrow(filename string, mem memory.Allocator) (arrow.Table, error)
478478
return nil, err
479479
}
480480
defer rr.Close()
481-
records := make([]arrow.Record, rr.NumRecords(), rr.NumRecords())
481+
records := make([]arrow.Record, rr.NumRecords())
482482
i := 0
483483
for {
484484
rec, err := rr.Read()

sql3/planner/opbulkinsert.go

+37-44
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package planner
44

55
import (
66
"bufio"
7+
"bytes"
78
"context"
89
"encoding/csv"
910
"encoding/json"
@@ -320,7 +321,7 @@ func (i *bulkInsertSourceCSVRowIter) Next(ctx context.Context) (types.Row, error
320321
return nil, sql3.NewErrTypeConversionOnMap(0, 0, evalValue, mapColumn.colType.TypeDescription())
321322
}
322323
} else {
323-
//implicit conversion of int to timestamp will treat int as seconds since unix epoch
324+
// implicit conversion of int to timestamp will treat int as seconds since unix epoch
324325
result[idx] = time.Unix(intVal, 0).UTC()
325326
}
326327

@@ -448,7 +449,6 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
448449
return nil, sql3.NewErrInternalf("unexpected type for mapValue '%T'", rawMapValue)
449450
}
450451
i.mapExpressionResults = append(i.mapExpressionResults, mapValue)
451-
452452
path, err := builder.NewEvaluable(mapValue)
453453
if err != nil {
454454
return nil, err
@@ -507,13 +507,14 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
507507

508508
// parse the json
509509
v := interface{}(nil)
510-
err := json.Unmarshal([]byte(jsonValue), &v)
510+
dec := json.NewDecoder(bytes.NewReader([]byte(jsonValue)))
511+
dec.UseNumber()
512+
err := dec.Decode(&v)
511513
if err != nil {
512514
return nil, sql3.NewErrParsingJSON(0, 0, jsonValue, err.Error())
513515
}
514516

515517
// type check against the output type of the map operation
516-
517518
for idx, expr := range i.pathExpressions {
518519

519520
evalValue, err := expr(ctx, v)
@@ -534,16 +535,14 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
534535
mapColumn := i.options.mapExpressions[idx]
535536
switch mapColumn.colType.(type) {
536537
case *parser.DataTypeID, *parser.DataTypeInt:
537-
538538
switch v := evalValue.(type) {
539-
case float64:
540-
// if v is a whole number then make it an int
541-
if v == float64(int64(v)) {
542-
result[idx] = int64(v)
539+
case json.Number:
540+
n, err := v.Int64()
541+
if err == nil {
542+
result[idx] = n
543543
} else {
544544
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
545545
}
546-
547546
case []interface{}:
548547
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
549548

@@ -566,21 +565,21 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
566565

567566
case *parser.DataTypeIDSet:
568567
switch v := evalValue.(type) {
569-
case float64:
570-
// if v is a whole number then make it an int, and then turn that into an idset
571-
if v == float64(int64(v)) {
572-
result[idx] = []int64{int64(v)}
568+
case json.Number:
569+
n, err := v.Int64()
570+
if err == nil {
571+
result[idx] = []int64{n}
573572
} else {
574573
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
575574
}
576-
577575
case []interface{}:
578576
setValue := make([]int64, 0)
579577
for _, i := range v {
580578
switch v := i.(type) {
581-
case float64:
582-
if v == float64(int64(v)) {
583-
setValue = append(setValue, int64(v))
579+
case json.Number:
580+
i, e := v.Int64()
581+
if e == nil {
582+
setValue = append(setValue, i)
584583
} else {
585584
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
586585
}
@@ -616,13 +615,8 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
616615

617616
case *parser.DataTypeStringSet:
618617
switch v := evalValue.(type) {
619-
case float64:
620-
if v == float64(int64(v)) {
621-
result[idx] = []string{fmt.Sprintf("%d", int64(v))}
622-
} else {
623-
result[idx] = []string{fmt.Sprintf("%f", v)}
624-
}
625-
618+
case json.Number:
619+
result[idx] = []string{v.String()}
626620
case []interface{}:
627621
setValue := make([]string, 0)
628622
for _, i := range v {
@@ -649,11 +643,12 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
649643

650644
case *parser.DataTypeTimestamp:
651645
switch v := evalValue.(type) {
652-
case float64:
646+
case json.Number:
647+
n, err := v.Int64()
653648
// if v is a whole number then make it an int
654-
if v == float64(int64(v)) {
655-
//implicit conversion of int to timestamp will treat int as seconds since unix epoch
656-
result[idx] = time.Unix(int64(v), 0).UTC()
649+
if err == nil {
650+
// implicit conversion of int to timestamp will treat int as seconds since unix epoch
651+
result[idx] = time.Unix(n, 0).UTC()
657652
} else {
658653
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
659654
}
@@ -684,14 +679,8 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
684679

685680
case *parser.DataTypeString:
686681
switch v := evalValue.(type) {
687-
case float64:
688-
// if a whole number make it an int
689-
if v == float64(int64(v)) {
690-
result[idx] = fmt.Sprintf("%d", int64(v))
691-
} else {
692-
result[idx] = fmt.Sprintf("%f", v)
693-
}
694-
682+
case json.Number:
683+
result[idx] = v.String()
695684
case []interface{}:
696685
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
697686

@@ -710,10 +699,11 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
710699

711700
case *parser.DataTypeBool:
712701
switch v := evalValue.(type) {
713-
case float64:
702+
case json.Number:
714703
// if a whole number make it an int, and convert to a bool
715-
if v == float64(int64(v)) {
716-
result[idx] = v > 0
704+
n, err := v.Int64()
705+
if err == nil {
706+
result[idx] = n > 0
717707
} else {
718708
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
719709
}
@@ -736,8 +726,12 @@ func (i *bulkInsertSourceNDJsonRowIter) Next(ctx context.Context) (types.Row, er
736726

737727
case *parser.DataTypeDecimal:
738728
switch v := evalValue.(type) {
739-
case float64:
740-
result[idx] = pql.FromFloat64(v)
729+
case json.Number:
730+
f, err := v.Float64()
731+
if err != nil {
732+
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
733+
}
734+
result[idx] = pql.FromFloat64(f)
741735

742736
case []interface{}:
743737
return nil, sql3.NewErrTypeConversionOnMap(0, 0, v, mapColumn.colType.TypeDescription())
@@ -989,7 +983,6 @@ func (pr *parquetReader) Read() ([]interface{}, error) {
989983
return nil, io.EOF // done
990984
}
991985
for i, col := range pr.columnOrder {
992-
// vprint.VV("check row:%v col:%v", pr.rowOffset, col.realColumn)
993986
pr.row[i] = pr.table.Get(col.realColumn, pr.rowOffset)
994987
}
995988
pr.rowOffset++
@@ -1155,7 +1148,7 @@ func (i *bulkInsertSourceParquetRowIter) Next(ctx context.Context) (types.Row, e
11551148

11561149
case *parser.DataTypeTimestamp:
11571150
if intVal, ok := evalValue.(int64); ok {
1158-
//implicit conversion of int to timestamp will treat int as seconds since unix epoch
1151+
// implicit conversion of int to timestamp will treat int as seconds since unix epoch
11591152
result[idx] = time.Unix(intVal, 0).UTC()
11601153
} else if stringVal, ok := evalValue.(string); ok {
11611154
if tm, err := time.ParseInLocation(time.RFC3339Nano, stringVal, time.UTC); err == nil {

sql3/sql_complex_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/featurebasedb/featurebase/v3/pql"
2424
sql_test "github.com/featurebasedb/featurebase/v3/sql3/test"
2525
"github.com/featurebasedb/featurebase/v3/test"
26+
"github.com/featurebasedb/featurebase/v3/vprint"
2627
"github.com/google/go-cmp/cmp"
2728
"github.com/stretchr/testify/assert"
2829
)
@@ -3038,3 +3039,39 @@ func TestPlanner_BulkInsert_FP1916(t *testing.T) {
30383039
expected := int64(8924809397503602651)
30393040
assert.Equal(t, got, expected)
30403041
}
3042+
3043+
func TestPlanner_BulkInsert_FP1915(t *testing.T) {
3044+
c := test.MustRunCluster(t, 3)
3045+
defer c.Close()
3046+
// 8924809397503602651 is larger than 2^53 which is the largest integer value representable in float64
3047+
node := c.GetNode(0).Server
3048+
_, _, _, err := sql_test.MustQueryRows(t, node, `create table ids (
3049+
_id id,
3050+
a int,
3051+
b int);`)
3052+
assert.NoError(t, err)
3053+
_, _, _, err = sql_test.MustQueryRows(t, node, `BULK INSERT INTO ids (_id, a, b)
3054+
map ('$._id' id, '$.a' int, '$.b' int)
3055+
from x'{ "_id":8924809397503602651 , "a": 10, "b": 20 }
3056+
{ "_id":"8924809397503602652" , "a": 10, "b": 20 }'
3057+
WITH
3058+
FORMAT 'NDJSON'
3059+
INPUT 'STREAM';`)
3060+
assert.NoError(t, err)
3061+
results, _, _, err := sql_test.MustQueryRows(t, node, `select _id from ids`)
3062+
assert.NoError(t, err)
3063+
got := make([]int64, 0)
3064+
for i := range results {
3065+
got = append(got, results[i][0].(int64))
3066+
}
3067+
sort.Slice(got, func(i, j int) bool {
3068+
return got[i] < got[j]
3069+
})
3070+
vprint.VV("results %#v", results)
3071+
if diff := cmp.Diff([]int64{
3072+
8924809397503602651,
3073+
8924809397503602652,
3074+
}, got); diff != "" {
3075+
t.Fatal(diff)
3076+
}
3077+
}

translate_boltdb.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@ import (
1010
"io"
1111
"os"
1212
"path/filepath"
13+
"runtime/pprof"
1314
"sync"
1415
"time"
1516

1617
"github.com/featurebasedb/featurebase/v3/roaring"
1718
"github.com/pkg/errors"
1819
bolt "go.etcd.io/bbolt"
19-
20-
"runtime/pprof"
2120
)
2221

2322
var _ = pprof.StartCPUProfile
@@ -102,7 +101,6 @@ func NewBoltTranslateStore(index, field string, partitionID, partitionN int, fsy
102101

103102
// Open opens the translate file.
104103
func (s *BoltTranslateStore) Open() (err error) {
105-
106104
// add the path to the problem database if we panic handling it.
107105
defer func() {
108106
r := recover()
@@ -111,9 +109,9 @@ func (s *BoltTranslateStore) Open() (err error) {
111109
}
112110
}()
113111

114-
if err := os.MkdirAll(filepath.Dir(s.Path), 0750); err != nil {
112+
if err := os.MkdirAll(filepath.Dir(s.Path), 0o750); err != nil {
115113
return errors.Wrapf(err, "mkdir %s", filepath.Dir(s.Path))
116-
} else if s.db, err = bolt.Open(s.Path, 0600, &bolt.Options{Timeout: 1 * time.Second, NoSync: !s.fsyncEnabled, InitialMmapSize: 0}); err != nil {
114+
} else if s.db, err = bolt.Open(s.Path, 0o600, &bolt.Options{Timeout: 1 * time.Second, NoSync: !s.fsyncEnabled, InitialMmapSize: 0}); err != nil {
117115
return errors.Wrapf(err, "open file: %s", err)
118116
}
119117

@@ -513,7 +511,6 @@ func (r *BoltTranslateEntryReader) ReadEntry(entry *TranslateEntry) error {
513511

514512
type boltWrapper struct {
515513
tx *bolt.Tx
516-
db *bolt.DB
517514
}
518515

519516
func (w *boltWrapper) Commit() error {
@@ -528,6 +525,7 @@ func (w *boltWrapper) Rollback() {
528525
w.tx.Rollback()
529526
}
530527
}
528+
531529
func (s *BoltTranslateStore) FreeIDs() (*roaring.Bitmap, error) {
532530
result := roaring.NewBitmap()
533531
err := s.db.View(func(tx *bolt.Tx) error {
@@ -544,11 +542,12 @@ func (s *BoltTranslateStore) FreeIDs() (*roaring.Bitmap, error) {
544542
})
545543
return result, err
546544
}
545+
547546
func (s *BoltTranslateStore) MergeFree(tx *bolt.Tx, newIDs *roaring.Bitmap) error {
548547
bkt := tx.Bucket(bucketFree)
549548
b := bkt.Get(freeKey)
550549
buf := new(bytes.Buffer)
551-
if b != nil { //if existing combine with newIDs
550+
if b != nil { // if existing combine with newIDs
552551
before := roaring.NewBitmap()
553552
err := before.UnmarshalBinary(b)
554553
if err != nil {

0 commit comments

Comments
 (0)