Skip to content

Commit e2a7b93

Browse files
committed
feat: reduce memory footprint by selecting portion
1 parent 64b402e commit e2a7b93

File tree

1 file changed

+34
-24
lines changed

1 file changed

+34
-24
lines changed

batch.go

+34-24
Original file line numberDiff line numberDiff line change
@@ -22,47 +22,57 @@ func Batching(table Table, conn driver.Conn, batchSize int, onBatch func([][]int
2222
query = fmt.Sprintf("%s WHERE %s > '%s'", query, table.Cursor.Column, table.Cursor.LastSync.Format(time.DateTime))
2323
}
2424

25-
var scannerVal []interface{}
26-
total := 0
27-
28-
rows, err := conn.Query(ctx, query)
29-
if err != nil {
25+
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM (%s) AS subquery", query)
26+
var count uint64
27+
if err := conn.QueryRow(ctx, countQuery).Scan(&count); err != nil {
3028
return 0, err
3129
}
3230

33-
batch := [][]interface{}{}
34-
for rows.Next() {
35-
if scannerVal == nil {
36-
scannerVal = GetScannerValues(rows.ColumnTypes())
37-
}
31+
var scannerVal []interface{}
32+
total := 0
33+
offset := 0
3834

39-
values := make([]interface{}, len(scannerVal))
40-
for i := range values {
41-
values[i] = reflect.New(reflect.TypeOf(scannerVal[i])).Interface()
35+
pk := ""
36+
for _, col := range table.Columns {
37+
if col.Primary {
38+
pk = col.Source
39+
break
4240
}
41+
}
4342

44-
if err := rows.Scan(values...); err != nil {
43+
for total < int(count) {
44+
rows, err := conn.Query(ctx, fmt.Sprintf("%s ORDER BY %s LIMIT %d OFFSET %d", query, pk, batchSize, offset))
45+
if err != nil {
4546
return 0, err
4647
}
4748

48-
batch = append(batch, values)
49+
batch := [][]interface{}{}
50+
for rows.Next() {
51+
if scannerVal == nil {
52+
scannerVal = GetScannerValues(rows.ColumnTypes())
53+
}
4954

50-
if len(batch) == batchSize {
51-
if err := onBatch(batch); err != nil {
55+
values := make([]interface{}, len(scannerVal))
56+
for i := range values {
57+
values[i] = reflect.New(reflect.TypeOf(scannerVal[i])).Interface()
58+
}
59+
60+
if err := rows.Scan(values...); err != nil {
5261
return 0, err
5362
}
5463

55-
total += len(batch)
56-
batch = [][]interface{}{}
64+
batch = append(batch, values)
5765
}
58-
}
5966

60-
if len(batch) > 0 {
61-
total += len(batch)
67+
if len(batch) > 0 {
68+
total += len(batch)
6269

63-
if err := onBatch(batch); err != nil {
64-
return 0, err
70+
if err := onBatch(batch); err != nil {
71+
return 0, err
72+
}
6573
}
74+
75+
offset += batchSize
6676
}
6777

6878
return total, nil

0 commit comments

Comments
 (0)