Skip to content

[to dev/1.3] add tsblock #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: dev/1.3
Choose a base branch
from
821 changes: 821 additions & 0 deletions client/column.go

Large diffs are not rendered by default.

296 changes: 296 additions & 0 deletions client/column_decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package client

import (
"bytes"
"encoding/binary"
"fmt"
)

type ColumnDecoder interface {
ReadTimeColumn(reader *bytes.Reader, positionCount int32) (*TimeColumn, error)
ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error)
}

func deserializeNullIndicators(reader *bytes.Reader, positionCount int32) ([]bool, error) {
b, err := reader.ReadByte()
if err != nil {
return nil, err
}
mayHaveNull := b != 0
if !mayHaveNull {
return nil, nil
}
return deserializeBooleanArray(reader, positionCount)
}

func deserializeBooleanArray(reader *bytes.Reader, size int32) ([]bool, error) {
packedSize := (size + 7) / 8
packedBytes := make([]byte, packedSize)

_, err := reader.Read(packedBytes)
if err != nil {
return nil, err
}

// read null bits 8 at a time
output := make([]bool, size)
currentByte := 0
fullGroups := int(size) & ^0b111
for pos := 0; pos < fullGroups; pos += 8 {
b := packedBytes[currentByte]
currentByte++

output[pos+0] = (b & 0b10000000) != 0
output[pos+1] = (b & 0b01000000) != 0
output[pos+2] = (b & 0b00100000) != 0
output[pos+3] = (b & 0b00010000) != 0
output[pos+4] = (b & 0b00001000) != 0
output[pos+5] = (b & 0b00000100) != 0
output[pos+6] = (b & 0b00000010) != 0
output[pos+7] = (b & 0b00000001) != 0
}

// read last null bits
if remaining := int(size) % 8; remaining > 0 {
b := packedBytes[len(packedBytes)-1]
mask := uint8(0b10000000)

for pos := fullGroups; pos < int(size); pos++ {
output[pos] = (b & mask) != 0
mask >>= 1
}
}

return output, nil
}

type baseColumnDecoder struct{}

func (_ *baseColumnDecoder) ReadTimeColumn(_ *bytes.Reader, _ int32) (*TimeColumn, error) {
return nil, fmt.Errorf("unsupported operation: ReadTimeColumn")
}

type Int32ArrayColumnDecoder struct {
baseColumnDecoder
}

func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
// Serialized data layout:
// +---------------+-----------------+-------------+
// | may have null | null indicators | values |
// +---------------+-----------------+-------------+
// | byte | list[byte] | list[int32] |
// +---------------+-----------------+-------------+
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
if err != nil {
return nil, err
}
switch dataType {
case INT32, DATE:
intValues := make([]int32, positionCount)
for i := int32(0); i < positionCount; i++ {
if nullIndicators != nil && nullIndicators[i] {
continue
}
err := binary.Read(reader, binary.BigEndian, &intValues[i])
if err != nil {
return nil, err
}
}
return NewIntColumn(0, positionCount, nullIndicators, intValues)
case FLOAT:
floatValues := make([]float32, positionCount)
for i := int32(0); i < positionCount; i++ {
if nullIndicators != nil && nullIndicators[i] {
continue
}
err := binary.Read(reader, binary.BigEndian, &floatValues[i])
if err != nil {
return nil, err
}
}
return NewFloatColumn(0, positionCount, nullIndicators, floatValues)
}
return nil, fmt.Errorf("invalid data type: %v", dataType)
}

type Int64ArrayColumnDecoder struct {
baseColumnDecoder
}

func (decoder *Int64ArrayColumnDecoder) ReadTimeColumn(reader *bytes.Reader, positionCount int32) (*TimeColumn, error) {
// Serialized data layout:
// +---------------+-----------------+-------------+
// | may have null | null indicators | values |
// +---------------+-----------------+-------------+
// | byte | list[byte] | list[int64] |
// +---------------+-----------------+-------------+

nullIndicators, err := deserializeNullIndicators(reader, positionCount)
if err != nil {
return nil, err
}
if nullIndicators != nil {
return nil, fmt.Errorf("time column should not contain null values")
}
values := make([]int64, positionCount)
for i := int32(0); i < positionCount; i++ {
err = binary.Read(reader, binary.BigEndian, &values[i])
}
return NewTimeColumn(0, positionCount, values)
}

func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
// Serialized data layout:
// +---------------+-----------------+-------------+
// | may have null | null indicators | values |
// +---------------+-----------------+-------------+
// | byte | list[byte] | list[int64] |
// +---------------+-----------------+-------------+
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
if err != nil {
return nil, err
}
switch dataType {
case INT64, TIMESTAMP:
values := make([]int64, positionCount)
for i := int32(0); i < positionCount; i++ {
if nullIndicators != nil && nullIndicators[i] {
continue
}
if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil {
return nil, err
}
}
return NewLongColumn(0, positionCount, nullIndicators, values)
case DOUBLE:
values := make([]float64, positionCount)
for i := int32(0); i < positionCount; i++ {
if nullIndicators != nil && nullIndicators[i] {
continue
}
if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil {
return nil, err
}
}
return NewDoubleColumn(0, positionCount, nullIndicators, values)
}
return nil, fmt.Errorf("invalid data type: %v", dataType)
}

type ByteArrayColumnDecoder struct {
baseColumnDecoder
}

func (decoder *ByteArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
// Serialized data layout:
// +---------------+-----------------+-------------+
// | may have null | null indicators | values |
// +---------------+-----------------+-------------+
// | byte | list[byte] | list[byte] |
// +---------------+-----------------+-------------+

if dataType != BOOLEAN {
return nil, fmt.Errorf("invalid data type: %v", dataType)
}
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
if err != nil {
return nil, err
}
values, err := deserializeBooleanArray(reader, positionCount)
if err != nil {
return nil, err
}
return NewBooleanColumn(0, positionCount, nullIndicators, values)
}

type BinaryArrayColumnDecoder struct {
baseColumnDecoder
}

func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
// Serialized data layout:
// +---------------+-----------------+-------------+
// | may have null | null indicators | values |
// +---------------+-----------------+-------------+
// | byte | list[byte] | list[entry] |
// +---------------+-----------------+-------------+
//
// Each entry is represented as:
// +---------------+-------+
// | value length | value |
// +---------------+-------+
// | int32 | bytes |
// +---------------+-------+

if TEXT != dataType {
return nil, fmt.Errorf("invalid data type: %v", dataType)
}
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
if err != nil {
return nil, err
}
values := make([]*Binary, positionCount)
for i := int32(0); i < positionCount; i++ {
if nullIndicators != nil && nullIndicators[i] {
continue
}
var length int32
err := binary.Read(reader, binary.BigEndian, &length)
if err != nil {
return nil, err
}
value := make([]byte, length)
_, err = reader.Read(value)
if err != nil {
return nil, err
}
values[i] = NewBinary(value)
}
return NewBinaryColumn(0, positionCount, nullIndicators, values)
}

type RunLengthColumnDecoder struct {
baseColumnDecoder
}

func (decoder *RunLengthColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) {
// Serialized data layout:
// +-----------+-------------------------+
// | encoding | serialized inner column |
// +-----------+-------------------------+
// | byte | list[byte] |
// +-----------+-------------------------+
columnEncoding, err := deserializeColumnEncoding(reader)
if err != nil {
return nil, err
}
columnDecoder, err := getColumnDecoder(columnEncoding)
if err != nil {
return nil, err
}
column, err := columnDecoder.ReadColumn(reader, dataType, 1)
if err != nil {
return nil, err
}
return NewRunLengthEncodedColumn(column, positionCount)
}
48 changes: 48 additions & 0 deletions client/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package client

import "fmt"

type TSDataType int8

type TSEncoding uint8
Expand All @@ -39,6 +41,48 @@ const (
STRING TSDataType = 11
)

var tsTypeMap = map[string]TSDataType{
"BOOLEAN": BOOLEAN,
"INT32": INT32,
"INT64": INT64,
"FLOAT": FLOAT,
"DOUBLE": DOUBLE,
"TEXT": TEXT,
"TIMESTAMP": TIMESTAMP,
"DATE": DATE,
"BLOB": BLOB,
"STRING": STRING,
}

var byteToTsDataType = map[byte]TSDataType{
0: BOOLEAN,
1: INT32,
2: INT64,
3: FLOAT,
4: DOUBLE,
5: TEXT,
8: TIMESTAMP,
9: DATE,
10: BLOB,
11: STRING,
}

func GetDataTypeByStr(name string) (TSDataType, error) {
dataType, exists := tsTypeMap[name]
if !exists {
return UNKNOWN, fmt.Errorf("invalid input: %v", name)
}
return dataType, nil
}

func getDataTypeByByte(b byte) (TSDataType, error) {
dataType, exists := byteToTsDataType[b]
if !exists {
return UNKNOWN, fmt.Errorf("invalid input: %v", b)
}
return dataType, nil
}

const (
PLAIN TSEncoding = 0
DICTIONARY TSEncoding = 1
Expand Down Expand Up @@ -202,3 +246,7 @@ const (
CqAlreadyExist int32 = 1402
CqUpdateLastExecTimeError int32 = 1403
)

const (
TimestampColumnName = "Time"
)
Loading