Skip to content

Commit

Permalink
stream test (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbohan authored Feb 1, 2024
1 parent 40b60ef commit 1857714
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 58 deletions.
33 changes: 2 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/GreptimeTeam/greptimedb-ingester-go/blob/main/LICENSE)
[![Build Status](https://github.com/greptimeteam/greptimedb-ingester-go/actions/workflows/ci.yml/badge.svg)](https://github.com/GreptimeTeam/greptimedb-ingester-go/blob/main/.github/workflows/ci.yml)
[![codecov](https://codecov.io/gh/GreptimeTeam/greptimedb-ingester-go/branch/main/graph/badge.svg?token=76KIKITADQ)](https://codecov.io/gh/GreptimeTeam/greptimedb-ingester-go)
[![Go Reference](https://pkg.go.dev/badge/github.com/GreptimeTeam/greptimedb-ingester-go.svg)](https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go)
Expand All @@ -7,7 +6,7 @@

NOTE: the project is still in its early stages.

Provide API for using GreptimeDB client in Go.
Provide API for using GreptimeDB ingester in Go.

## Installation

Expand All @@ -17,32 +16,4 @@ go get -u github.com/GreptimeTeam/greptimedb-ingester-go

## Documentation

visit [docs](./docs) to get complete examples. You can also visit [Documentation][document] more details.

## API reference

### Datatype Supported

- int8, int16, int32, int64, int
- uint8, uint16, uint32, uint64, uint
- float32, float64
- bool
- []byte
- string
- time.Time

### Customize metric Timestamp

you can customize timestamp index via calling methods of [Metric][metric_doc]

- `metric.SetTimePrecision(time.Microsecond)`
- `metric.SetTimestampAlias("timestamp")`

## License

This greptimedb-ingester-go uses the __Apache 2.0 license__ to strike a balance
between open contributions and allowing you to use the software however you want.

<!-- links -->
[document]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go
[metric_doc]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-ingester-go#Metric
TODO
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client struct {

// New helps to create the greptimedb client, which will be responsible write data into GreptimeDB.
func New(cfg *config.Config) (*Client, error) {
conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...)
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...)
if err != nil {
return nil, err
}
Expand Down
62 changes: 38 additions & 24 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"fmt"
"log"
"math/rand"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -35,9 +37,15 @@ import (
"github.com/GreptimeTeam/greptimedb-ingester-go/table/types"
)

//TODO(yuanbohan):
// unmatched length of columns in rows and columns in schema
// support pointer
// write pojo

var (
tableName = ""
timezone = "UTC"
monitorTableName = "monitor"
datatypesTableName = "datatypes"
timezone = "UTC"

database = "public"
host = "127.0.0.1"
Expand Down Expand Up @@ -81,7 +89,7 @@ type datatype struct {
}

func (datatype) TableName() string {
return tableName
return datatypesTableName
}

type monitor struct {
Expand All @@ -95,7 +103,7 @@ type monitor struct {
}

func (monitor) TableName() string {
return tableName
return monitorTableName
}

type Mysql struct {
Expand Down Expand Up @@ -155,6 +163,22 @@ func newClient() *Client {
return client
}

func randomId() int64 {
s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)
return r.Int63()
}

func getMonitorsIds(monitors []monitor) string {
ids := make([]string, 0)

for _, monitor := range monitors {
ids = append(ids, strconv.Itoa(int(monitor.ID)))
}

return fmt.Sprintf("(%s)", strings.Join(ids, ","))
}

func newMysql() *Mysql {
db := &Mysql{
Host: host,
Expand Down Expand Up @@ -224,11 +248,10 @@ func init() {

cli = newClient()
db = newMysql()
streamClient = newStreamClient()
}

func TestInsertMonitors(t *testing.T) {
tableName = "test_insert_monitor"

loc, err := time.LoadLocation(timezone)
assert.Nil(t, err)
ts1 := time.Now().Add(-1 * time.Minute).UnixMilli()
Expand All @@ -238,7 +261,7 @@ func TestInsertMonitors(t *testing.T) {

monitors := []monitor{
{
ID: 1,
ID: randomId(),
Host: "127.0.0.1",
Memory: 1,
Cpu: 1.0,
Expand All @@ -247,7 +270,7 @@ func TestInsertMonitors(t *testing.T) {
Running: true,
},
{
ID: 2,
ID: randomId(),
Host: "127.0.0.2",
Memory: 2,
Cpu: 2.0,
Expand All @@ -257,7 +280,7 @@ func TestInsertMonitors(t *testing.T) {
},
}

table, err := tbl.New(tableName)
table, err := tbl.New(monitorTableName)
assert.Nil(t, err)

assert.Nil(t, table.AddTagColumn("id", types.INT64))
Expand All @@ -281,7 +304,7 @@ func TestInsertMonitors(t *testing.T) {
assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg())
assert.Equal(t, uint32(len(monitors)), resp.GetAffectedRows().GetValue())

monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in (1, 2) order by id asc", tableName))
monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors)))
assert.Nil(t, err)

assert.Equal(t, len(monitors), len(monitors_))
Expand All @@ -292,14 +315,12 @@ func TestInsertMonitors(t *testing.T) {
}

func TestInsertMonitorWithNilFields(t *testing.T) {
tableName = "test_insert_monitor_with_nil_fields"

loc, err := time.LoadLocation(timezone)
assert.Nil(t, err)
ts := time.Now().Add(-1 * time.Minute).UnixMilli()
time := time.UnixMilli(ts).In(loc)
monitor := monitor{
ID: 11,
ID: randomId(),
Host: "127.0.0.1",
Memory: 1,
Cpu: 1.0,
Expand All @@ -308,7 +329,7 @@ func TestInsertMonitorWithNilFields(t *testing.T) {
Running: true,
}

table, err := tbl.New(tableName)
table, err := tbl.New(monitorTableName)
assert.Nil(t, err)

assert.Nil(t, table.AddTagColumn("id", types.INT64))
Expand All @@ -328,7 +349,7 @@ func TestInsertMonitorWithNilFields(t *testing.T) {
assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode())
assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg())

monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d", tableName, monitor.ID))
monitors_, err := db.Query(fmt.Sprintf("select * from %s where id = %d", monitorTableName, monitor.ID))
assert.Nil(t, err)
assert.Equal(t, 1, len(monitors_))
monitor_ := monitors_[0]
Expand All @@ -343,9 +364,7 @@ func TestInsertMonitorWithNilFields(t *testing.T) {
assert.Zero(t, monitor_.Temperature)
}

func TestInsertMonitorWithAllDatatypes(t *testing.T) {
tableName = "test_insert_monitor_with_all_datatypes"

func TestInsertAllDatatypes(t *testing.T) {
loc, err := time.LoadLocation(timezone)
assert.Nil(t, err)

Expand All @@ -367,7 +386,7 @@ func TestInsertMonitorWithAllDatatypes(t *testing.T) {
BINARY := []byte{1, 2, 3}
STRING := "string"

table, err := tbl.New(tableName)
table, err := tbl.New(datatypesTableName)
assert.Nil(t, err)

assert.Nil(t, table.AddTagColumn("int8", types.INT8))
Expand Down Expand Up @@ -457,8 +476,3 @@ func TestInsertMonitorWithAllDatatypes(t *testing.T) {
// MySQL protocol only supports microsecond precision for TIMESTAMP
assert.EqualValues(t, time_.UnixNano()/1000, result.TIMESTAMP_NANOSECOND_INT.UnixNano()/1000)
}

//TODO(yuanbohan):
// unmatched length of columns in rows and columns in schema
// support pointer
// write pojo
2 changes: 1 addition & 1 deletion client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type StreamClient struct {
}

func NewStreamClient(cfg *config.Config) (*StreamClient, error) {
conn, err := grpc.Dial(cfg.GetGRPCAddr(), cfg.DialOptions...)
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.DialOptions...)
if err != nil {
return nil, err
}
Expand Down
92 changes: 92 additions & 0 deletions client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,100 @@
package client

import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
tbl "github.com/GreptimeTeam/greptimedb-ingester-go/table"
"github.com/GreptimeTeam/greptimedb-ingester-go/table/types"
)

var (
streamClient *StreamClient
)

func newStreamClient() *StreamClient {
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
cfg := config.New(host).
WithPort(grpcPort).
WithDatabase(database).
WithDialOptions(options...)

client, err := NewStreamClient(cfg)
if err != nil {
log.Fatalf("failed to create client: %s", err.Error())
}
return client
}

func TestStreamInsert(t *testing.T) {
loc, err := time.LoadLocation(timezone)
assert.Nil(t, err)
ts1 := time.Now().Add(-1 * time.Minute).UnixMilli()
time1 := time.UnixMilli(ts1).In(loc)
ts2 := time.Now().Add(-2 * time.Minute).UnixMilli()
time2 := time.UnixMilli(ts2).In(loc)

monitors := []monitor{
{
ID: randomId(),
Host: "127.0.0.1",
Memory: 1,
Cpu: 1.0,
Temperature: -1,
Ts: time1,
Running: true,
},
{
ID: randomId(),
Host: "127.0.0.2",
Memory: 2,
Cpu: 2.0,
Temperature: -2,
Ts: time2,
Running: true,
},
}

table, err := tbl.New(monitorTableName)
assert.Nil(t, err)

assert.Nil(t, table.AddTagColumn("id", types.INT64))
assert.Nil(t, table.AddTagColumn("host", types.STRING))
assert.Nil(t, table.AddFieldColumn("memory", types.UINT64))
assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64))
assert.Nil(t, table.AddFieldColumn("temperature", types.INT64))
assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN))
assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND))

for _, monitor := range monitors {
err := table.AddRow(monitor.ID, monitor.Host,
monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running,
monitor.Ts)
assert.Nil(t, err)
}

err = streamClient.Send(context.Background(), table)
assert.Nil(t, err)
affected, err := streamClient.CloseAndRecv(context.Background())
assert.EqualValues(t, 2, affected.GetValue())
assert.Nil(t, err)

monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors)))
assert.Nil(t, err)

assert.Equal(t, len(monitors), len(monitors_))

for i, monitor_ := range monitors_ {
assert.Equal(t, monitors[i], monitor_)
}
}
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ func (c *Config) BuildAuthHeader() *greptimepb.AuthHeader {

}

func (c *Config) GetGRPCAddr() string {
func (c *Config) GetEndpoint() string {
return fmt.Sprintf("%s:%d", c.Host, c.Port)
}

0 comments on commit 1857714

Please sign in to comment.