Skip to content

Commit

Permalink
Implement weighted round robin in clickhouse output plugin (#550)
Browse files Browse the repository at this point in the history
* Implement weighted round robin in clickhouse output plugin
  • Loading branch information
HeadHunter483 authored Dec 12, 2023
1 parent 044151e commit 1156024
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 6 deletions.
15 changes: 14 additions & 1 deletion plugin/output/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,24 @@ It sends the event batches to Clickhouse database using
File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features.

### Config params
**`addresses`** *`[]string`* *`required`*
**`addresses`** *`[]Address`* *`required`*

TCP Clickhouse addresses, e.g.: 127.0.0.1:9000.
Check the insert_strategy to find out how File.d will behave with a list of addresses.

Accepts strings or objects, e.g.:
```yaml
addresses:
- 127.0.0.1:9000 # the same as {addr:'127.0.0.1:9000',weight:1}
- addr: 127.0.0.1:9001
weight: 2
```
When some addresses get weight greater than 1 and round_robin insert strategy is used,
it works as classical weighted round robin. Given {(a_1,w_1),(a_1,w_1),...,{a_n,w_n}},
where a_i is the ith address and w_i is the ith address' weight, requests are sent in order:
w_1 times to a_1, w_2 times to a_2, ..., w_n times to a_n, w_1 times to a_1 and so on.
<br>
**`insert_strategy`** *`string`* *`default=round_robin`* *`options=round_robin|in_order`*
Expand Down
56 changes: 51 additions & 5 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package clickhouse

import (
"bytes"
"context"
"encoding/json"
"errors"
"net"
"strings"
"time"
Expand Down Expand Up @@ -89,14 +91,56 @@ const (
StrategyInOrder
)

type Address struct {
Addr string `json:"addr"`
Weight int `json:"weight"`
}

func (a *Address) UnmarshalJSON(b []byte) error {
if len(b) == 0 {
return nil
}

switch b[0] {
case '"':
a.Weight = 1
return json.Unmarshal(b, &a.Addr)
case '{':
type tmpAddress Address
tmp := tmpAddress{}
dec := json.NewDecoder(bytes.NewReader(b))
dec.DisallowUnknownFields()
err := dec.Decode(&tmp)
*a = Address(tmp)
return err
default:
return errors.New("failed to unmarshal to Address, the value must be string or object")
}
}

var _ json.Unmarshaler = (*Address)(nil)

// ! config-params
// ^ config-params
type Config struct {
// > @3@4@5@6
// >
// > TCP Clickhouse addresses, e.g.: 127.0.0.1:9000.
// > Check the insert_strategy to find out how File.d will behave with a list of addresses.
Addresses []string `json:"addresses" required:"true"` // *
// >
// > Accepts strings or objects, e.g.:
// > ```yaml
// > addresses:
// > - 127.0.0.1:9000 # the same as {addr:'127.0.0.1:9000',weight:1}
// > - addr: 127.0.0.1:9001
// > weight: 2
// > ```
// >
// > When some addresses get weight greater than 1 and round_robin insert strategy is used,
// > it works as classical weighted round robin. Given {(a_1,w_1),(a_1,w_1),...,{a_n,w_n}},
// > where a_i is the ith address and w_i is the ith address' weight, requests are sent in order:
// > w_1 times to a_1, w_2 times to a_2, ..., w_n times to a_n, w_1 times to a_1 and so on.
Addresses []Address `json:"addresses" required:"true" slice:"true"` // *

// > @3@4@5@6
// >
Expand Down Expand Up @@ -333,11 +377,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}

for _, addr := range p.config.Addresses {
addr = addrWithDefaultPort(addr, "9000")
addr.Addr = addrWithDefaultPort(addr.Addr, "9000")
pool, err := chpool.New(p.ctx, chpool.Options{
ClientOptions: ch.Options{
Logger: p.logger.Named("driver"),
Address: addr,
Address: addr.Addr,
Database: p.config.Database,
User: p.config.User,
Password: p.config.Password,
Expand All @@ -355,9 +399,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
HealthCheckPeriod: p.config.HealthCheckPeriod_,
})
if err != nil {
p.logger.Fatal("create clickhouse connection pool", zap.Error(err), zap.String("addr", addr))
p.logger.Fatal("create clickhouse connection pool", zap.Error(err), zap.String("addr", addr.Addr))
}
for j := 0; j < addr.Weight; j++ {
p.instances = append(p.instances, pool)
}
p.instances = append(p.instances, pool)
}

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
Expand Down
69 changes: 69 additions & 0 deletions plugin/output/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,72 @@ func Test_addrWithDefaultPort(t *testing.T) {
assert.Equal(t, tt.want, addrWithDefaultPort(tt.addr, defaultPort))
}
}

func TestAddress_UnmarshalJSON(t *testing.T) {
t.Parallel()
type fields struct {
Addr string
Weight int
}
type args struct {
b []byte
}
tests := []struct {
name string
args args
want fields
wantErr bool
}{
{
name: "ok_object",
args: args{
b: []byte(`{"addr":"127.0.0.1:9001","weight":2}`),
},
want: fields{
Addr: "127.0.0.1:9001",
Weight: 2,
},
},
{
name: "ok_string",
args: args{
b: []byte(`"127.0.0.1:9001"`),
},
want: fields{
Addr: "127.0.0.1:9001",
Weight: 1,
},
},
{
name: "invalid_type",
args: args{
b: []byte(`[{"field":"val"}]`),
},
wantErr: true,
},
{
name: "invalid_object",
args: args{
b: []byte(`{"field":"val"}`),
},
wantErr: true,
},
{
name: "empty",
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
a := &Address{}
if err := a.UnmarshalJSON(tt.args.b); (err != nil) != tt.wantErr {
t.Errorf("Address.UnmarshalJSON() error = %v, wantErr %v", err, tt.wantErr)
}
if !tt.wantErr {
assert.Equal(t, tt.want.Addr, a.Addr)
assert.Equal(t, tt.want.Weight, a.Weight)
}
})
}
}

0 comments on commit 1156024

Please sign in to comment.