From 8df11f553e6df5274bc2de17276b6046983fd37e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BC=B4=E5=86=9C?= Date: Mon, 5 Jun 2023 16:52:56 +0800 Subject: [PATCH 1/2] add rocketmq flusher --- CHANGELOG.md | 2 +- .../data-pipeline/flusher/flusher_rocketmq.md | 44 +++++++++++++++++++ go.mod | 11 ++++- go.sum | 22 +++++++++- licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md | 11 ++++- .../LICENSE_OF_TESTENGINE_DEPENDENCIES.md | 1 - plugins.yml | 1 + 7 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 docs/cn/data-pipeline/flusher/flusher_rocketmq.md diff --git a/CHANGELOG.md b/CHANGELOG.md index c1706a7cf3..8def89a575 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,4 +45,4 @@ your changes, such as: - [public] [both] [fix] json converter marshal without HTML escaped - [public] [both] [updated] plugin_main support file-to-file test - [public] [both] [updated] `metric_meta_kubernetes` support collect [kruise](github.com/openkruise/kruise) CRD meta - +- [public] [both] [added] add flusher rocketmq diff --git a/docs/cn/data-pipeline/flusher/flusher_rocketmq.md b/docs/cn/data-pipeline/flusher/flusher_rocketmq.md new file mode 100644 index 0000000000..c253534d0e --- /dev/null +++ b/docs/cn/data-pipeline/flusher/flusher_rocketmq.md @@ -0,0 +1,44 @@ +# rocketmq + +## 简介 + +`flusher_rocketmq` `flusher`插件可以实现将采集到的数据,经过处理后,发送到rocketmq。 + +## 版本 + +[Alpha](../stability-level.md) + +## 配置参数 + +| 参数 | 类型 | 是否必选 | 说明 | +|----------------------------|--------|------|-------------------------------------------------------------| +| NameServers | String | 是 | name server address | +| Topic | String | 是 | rocketmq Topic,支持动态topic, 例如: `test_%{content.appname}` | +| Sync | bool | 否 | 默认为异步发送 | +| MaxRetries | int | 否 | 重试次数 | +| BrokerTimeout | int | 否 | 超时时间,默认值3s | +| Convert | Struct | 否 | ilogtail数据转换协议配置 | +| PartitionerType | String | 否 | Partitioner类型。取值:`roundrobin`、`hash`、`random`。默认为:`random`。 | +| CompressionLevel | int | 否 | 压缩级别,可选值:`0~9`,0速度最快,9压缩效率最高 | +| CompressMsgBodyOverHowmuch | int | 否 | 压缩阈值 | +| ProducerGroupName | String | 否 | 发送者的producer group | + + +- `NameServers`是个数组,多个`NameServer`地址不能使用`;`或者`,`来隔开放在一行里,`yaml`配置文件中正确的多个`NameServer`地址配置参考如下: + +```yaml +enable: true +inputs: + - Type: file_log + LogPath: /home/test_log + FilePattern: "*.log" +flushers: + - Type: flusher_rocketmq + NameServers: + - 192.XX.XX.1:9876 + - 192.XX.XX.2:9876 + - 192.XX.XX.3:9876 + Topic: accesslog + Sync: false +``` + diff --git a/go.mod b/go.mod index 49bf176f91..deabbba133 100644 --- a/go.mod +++ b/go.mod @@ -148,7 +148,7 @@ require ( ) require ( - github.com/BurntSushi/toml v0.4.1 // indirect + github.com/BurntSushi/toml v1.1.0 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect github.com/Microsoft/hcsshim v0.9.4 // indirect github.com/aliyun/alibaba-cloud-sdk-go v1.61.1483 @@ -268,12 +268,20 @@ require ( github.com/VictoriaMetrics/fasthttp v1.1.0 // indirect github.com/VictoriaMetrics/metrics v1.23.0 // indirect github.com/VictoriaMetrics/metricsql v0.45.0 // indirect + github.com/apache/rocketmq-client-go/v2 v2.1.1 + github.com/emirpasic/gods v1.12.0 // indirect + github.com/golang/mock v1.6.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/influxdata/telegraf v1.20.0 // indirect github.com/openkruise/kruise-api v1.4.0 + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/richardartoul/molecule v1.0.0 // indirect + github.com/stathat/consistent v1.0.0 // indirect + github.com/tidwall/gjson v1.13.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/valyala/fastjson v1.6.3 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect @@ -283,6 +291,7 @@ require ( golang.org/x/mod v0.8.0 // indirect gopkg.in/ini.v1 v1.66.2 // indirect sigs.k8s.io/gateway-api v0.6.2 // indirect + ) replace ( diff --git a/go.sum b/go.sum index 37038e8170..b40ce73193 100644 --- a/go.sum +++ b/go.sum @@ -90,8 +90,8 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= -github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= +github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/ch-go v0.51.2 h1:PesdqjUImi21U61yPKsDhfer8wiQ3geTsjdjZzXd/3s= github.com/ClickHouse/ch-go v0.51.2/go.mod h1:z+/hEezvvHvRMV/I00CaXBnxOx+td4zRe7HJpBYLwGU= @@ -173,6 +173,8 @@ github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/pulsar-client-go v0.10.0 h1:ccwjmmaCjaE6bLYnrILpm8V4WQQ8rB3J98pOW0O2nyo= github.com/apache/pulsar-client-go v0.10.0/go.mod h1:l9ZNSafZdle1cpyFE5CkUL3uRYJMvoHjHHLlK0kL7c8= +github.com/apache/rocketmq-client-go/v2 v2.1.1 h1:WY/LkOYSQaVyV+HOqdiIgF4LE3beZ/jwdSLKZlzpabw= +github.com/apache/rocketmq-client-go/v2 v2.1.1/go.mod h1:GZzExtXY9zpI6FfiVJYAhw2IXQtgnHUuWpULo7nr5lw= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= @@ -506,6 +508,8 @@ github.com/emicklei/go-restful v2.16.0+incompatible h1:rgqiKNjTnFQA6kkhFe16D8epT github.com/emicklei/go-restful v2.16.0+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -711,6 +715,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1291,6 +1296,8 @@ github.com/oschwald/maxminddb-golang v1.2.1/go.mod h1:3jhIUymTJ5VREKyIhWm66LJiQt github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/paulbellamy/ratecounter v0.2.1-0.20170719102518-a803f0e4f071 h1:lL9DHDiy6y/YcERO1Baji6ed0S2XisLARdeWRs2ZHC8= github.com/paulbellamy/ratecounter v0.2.1-0.20170719102518-a803f0e4f071/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/paulmach/orb v0.8.0 h1:W5XAt5yNPNnhaMNEf0xNSkBMJ1LzOzdk2MRlB6EN0Vs= @@ -1488,6 +1495,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 h1:lIOOHPEbXzO3vnmx2gok1Tfs31Q8GQqKLc8vVqyQq/I= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= @@ -1521,7 +1530,13 @@ github.com/syndtr/goleveldb v0.0.0-20170725064836-b89cc31ef797/go.mod h1:Z4AUp2K github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= github.com/tchap/go-patricia v2.3.0+incompatible h1:GkY4dP3cEfEASBPPkWd+AmjYxhmDkqO9/zg7R0lSQRs= github.com/tchap/go-patricia v2.3.0+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= +github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= +github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= @@ -1668,6 +1683,7 @@ go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -2473,3 +2489,5 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md b/licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md index 402f033882..7861457928 100644 --- a/licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md +++ b/licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md @@ -99,6 +99,8 @@ When distributed in a binary form, iLogtail may contain portions of the followin - [github.com/mwitkow/go-conntrack](https://pkg.go.dev/github.com/mwitkow/go-conntrack?tab=licenses) - [sigs.k8s.io/json](https://pkg.go.dev/sigs.k8s.io/json?tab=licenses) - [gopkg.in/ini.v1](https://pkg.go.dev/gopkg.in/ini.v1?tab=licenses) +- [github.com/apache/rocketmq-client-go](https://pkg.go.dev/github.com/apache/rocketmq-client-go/v2/primitive?tab=licenses) +- [github.com/golang/mock](https://pkg.go.dev/github.com/golang/mock?tab=licenses) ## BSD licenses - [github.com/klauspost/compress](https://pkg.go.dev/github.com/klauspost/compress?tab=licenses) @@ -141,15 +143,14 @@ When distributed in a binary form, iLogtail may contain portions of the followin - [google.golang.org/protobuf](https://pkg.go.dev/google.golang.org/protobuf?tab=licenses) - [gopkg.in/inf.v0](https://pkg.go.dev/gopkg.in/inf.v0?tab=licenses) - [github.com/go-faster/errors](https://pkg.go.dev/github.com/go-faster/errors?tab=licenses) -- [github.com/miekg/pkcs11](https://pkg.go.dev/github.com/miekg/pkcs11?tab=licenses) - [github.com/grafana/regexp](https://pkg.go.dev/github.com/grafana/regexp?tab=licenses) - [github.com/munnerz/goautoneg](https://pkg.go.dev/github.com/munnerz/goautoneg?tab=licenses) - [github.com/pmezard/go-difflib](https://pkg.go.dev/github.com/pmezard/go-difflib?tab=licenses) - [github.com/gorilla/websocket](github.com/gorilla/websocket) +- [github.com/emirpasic/gods](https://pkg.go.dev/github.com/emirpasic/gods?tab=licenses) ## MIT licenses -- [github.com/valyala/gozstd](https://pkg.go.dev/github.com/valyala/gozstd?tab=licenses) - [github.com/gofrs/uuid](https://pkg.go.dev/github.com/gofrs/uuid?tab=licenses) - [github.com/go-kit/log](https://pkg.go.dev/github.com/go-kit/log?tab=licenses) - [github.com/narqo/go-dogstatsd-parser](https://pkg.go.dev/github.com/narqo/go-dogstatsd-parser?tab=licenses) @@ -235,6 +236,12 @@ When distributed in a binary form, iLogtail may contain portions of the followin - [github.com/stretchr/testify](https://pkg.go.dev/github.com/stretchr/testify?tab=licenses) - [go.uber.org/goleak](https://pkg.go.dev/go.uber.org/goleak?tab=licenses) - [github.com/influxdata/telegraf](https://pkg.go.dev/github.com/influxdata/telegraf?tab=licenses) +- [github.com/patrickmn/go-cache](https://pkg.go.dev/github.com/patrickmn/go-cache?tab=licenses) +- [github.com/stathat/consistent](https://pkg.go.dev/github.com/stathat/consistent?tab=licenses) +- [github.com/tidwall/gjson](https://pkg.go.dev/github.com/tidwall/gjson?tab=licenses) +- [github.com/tidwall/match](https://pkg.go.dev/github.com/tidwall/match?tab=licenses) +- [github.com/tidwall/pretty](https://pkg.go.dev/github.com/tidwall/pretty?tab=licenses) +- [github.com/99designs/go-keychain](https://pkg.go.dev/github.com/99designs/go-keychain?tab=licenses) ## ISC licenses diff --git a/licenses/LICENSE_OF_TESTENGINE_DEPENDENCIES.md b/licenses/LICENSE_OF_TESTENGINE_DEPENDENCIES.md index d4b8331538..0e1cf5b6fd 100644 --- a/licenses/LICENSE_OF_TESTENGINE_DEPENDENCIES.md +++ b/licenses/LICENSE_OF_TESTENGINE_DEPENDENCIES.md @@ -96,7 +96,6 @@ When distributed in a binary form, Logtailplugin Test Engine may contain portion - [sigs.k8s.io/yaml](https://pkg.go.dev/sigs.k8s.io/yaml?tab=licenses) - [github.com/go-faster/errors](https://pkg.go.dev/github.com/go-faster/errors?tab=licenses) - [github.com/pierrec/lz4](https://pkg.go.dev/github.com/pierrec/lz4?tab=licenses) -- [github.com/miekg/pkcs11](https://pkg.go.dev/github.com/miekg/pkcs11?tab=licenses) - [github.com/elastic/elastic-transport-go](https://pkg.go.dev/github.com/elastic/elastic-transport-go?tab=licenses) - [github.com/elastic/go-elasticsearch](https://pkg.go.dev/github.com/elastic/go-elasticsearch?tab=licenses) - [sigs.k8s.io/json](https://pkg.go.dev/sigs.k8s.io/json?tab=licenses) diff --git a/plugins.yml b/plugins.yml index 589baf109d..179ce433d8 100644 --- a/plugins.yml +++ b/plugins.yml @@ -49,6 +49,7 @@ plugins: - import: "github.com/alibaba/ilogtail/plugins/input/httpserver" - import: "github.com/alibaba/ilogtail/plugins/input/jmxfetch" - import: "github.com/alibaba/ilogtail/plugins/input/kafka" + - import: "github.com/alibaba/ilogtail/plugins/flusher/rocketmq" - import: "github.com/alibaba/ilogtail/plugins/input/kubernetesmeta" - import: "github.com/alibaba/ilogtail/plugins/input/lumberjack" - import: "github.com/alibaba/ilogtail/plugins/input/mock" From 6e27672377559253231970bad027902666e2c646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BC=B4=E5=86=9C?= Date: Mon, 5 Jun 2023 17:03:59 +0800 Subject: [PATCH 2/2] add rocketmq flusher --- plugins/flusher/rocketmq/flusher_rocketmq.go | 260 ++++++++++++++++++ .../flusher/rocketmq/flusher_rocketmq_test.go | 71 +++++ 2 files changed, 331 insertions(+) create mode 100644 plugins/flusher/rocketmq/flusher_rocketmq.go create mode 100644 plugins/flusher/rocketmq/flusher_rocketmq_test.go diff --git a/plugins/flusher/rocketmq/flusher_rocketmq.go b/plugins/flusher/rocketmq/flusher_rocketmq.go new file mode 100644 index 0000000000..251b3a3644 --- /dev/null +++ b/plugins/flusher/rocketmq/flusher_rocketmq.go @@ -0,0 +1,260 @@ +// Copyright 2022 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rocketmq + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" + + "github.com/alibaba/ilogtail/pkg/fmtstr" + "github.com/alibaba/ilogtail/pkg/logger" + "github.com/alibaba/ilogtail/pkg/pipeline" + "github.com/alibaba/ilogtail/pkg/protocol" + converter "github.com/alibaba/ilogtail/pkg/protocol/converter" +) + +const ( + PartitionerTypeRandom = "random" + PartitionerTypeRoundRobin = "roundrobin" + PartitionerTypeRoundHash = "hash" +) + +type convertConfig struct { + // Rename one or more fields from tags. + TagFieldsRename map[string]string + // Rename one or more fields, The protocol field options can only be: contents, tags, time + ProtocolFieldsRename map[string]string + // Convert protocol, default value: custom_single + Protocol string + // Convert encoding, default value:json + // The options are: 'json' + Encoding string +} + +type FlusherRocketmq struct { + NameServers []string `json:"NameServers" comment:"name server address"` + Topic string `json:"Topic" comment:"rocketmq topic"` + Sync bool `json:"Sync" comment:"send 2 rocketmq via sync or not"` + + MaxRetries int `json:"MaxRetries" comment:"send 2 rocketmq via sync or not"` + + // The default is 3s. + BrokerTimeout int `json:"BrokerTimeout" comment:"send 2 rocketmq client timeout"` + + // ilogtail data convert config + Convert convertConfig + + PartitionerType string `json:"PartitionerType" comment:"option of random roundrobin hash"` + + CompressionLevel int `json:"CompressionLevel" comment:"compress level range 0~9, 0 stands for best speed, 9 stands for best compression ratio"` + + CompressMsgBodyOverHowmuch int `json:"CompressMsgBodyOverHowmuch" comment:"specifies the threshold size of a message body for compression. If the message body size is larger than the specified value, it will be compressed before sending it to the broker. By default, the value is set to 4KB"` + + ProducerGroupName string `json:"ProducerGroupName" comment:"producer group name default LOGSTASH_PRODUCER"` + + context pipeline.Context + converter *converter.Converter + flusher FlusherFunc + producer rocketmq.Producer + topicKeys []string +} + +func (r *FlusherRocketmq) Description() string { + return "rocketmq flusher for logtail" +} + +func makePartitioner(r *FlusherRocketmq) (partitioner producer.QueueSelector, err error) { + switch r.PartitionerType { + case PartitionerTypeRoundRobin: + partitioner = producer.NewRoundRobinQueueSelector() + case PartitionerTypeRoundHash: + partitioner = producer.NewHashQueueSelector() + case PartitionerTypeRandom: + partitioner = producer.NewRandomQueueSelector() + default: + return nil, fmt.Errorf("invalid PartitionerType,configured value %v", r.PartitionerType) + } + return partitioner, nil +} + +func NewFlusherRocketmq() *FlusherRocketmq { + return &FlusherRocketmq{ + NameServers: nil, + Sync: true, + MaxRetries: 3, + BrokerTimeout: 5, + Convert: convertConfig{ + Protocol: converter.ProtocolCustomSingle, + Encoding: converter.EncodingJSON, + }, + CompressionLevel: 5, + PartitionerType: PartitionerTypeRandom, + CompressMsgBodyOverHowmuch: 4096, + ProducerGroupName: "LOGSTASH_PRODUCER", + } +} + +func (r *FlusherRocketmq) Init(context pipeline.Context) error { + r.context = context + if r.NameServers == nil || len(r.NameServers) == 0 { + err := errors.New("name server is nil") + return err + } + if r.Convert.Encoding == "" { + r.converter.Encoding = converter.EncodingJSON + } + + if r.Convert.Protocol == "" { + r.converter.Protocol = converter.ProtocolCustomSingle + } + + convert, err := r.getConverter() + if err != nil { + return err + } + r.converter = convert + + // Obtain topic keys from dynamic topic expression + topicKeys, err := fmtstr.CompileKeys(r.Topic) + if err != nil { + return err + } + r.topicKeys = topicKeys + + partitioner, err := makePartitioner(r) + if err != nil { + return err + } + p, err := rocketmq.NewProducer( + producer.WithNsResolver(primitive.NewPassthroughResolver(r.NameServers)), + producer.WithRetry(r.MaxRetries), + producer.WithSendMsgTimeout(time.Duration(r.BrokerTimeout)*time.Second), + producer.WithQueueSelector(partitioner), + producer.WithCompressMsgBodyOverHowmuch(r.CompressMsgBodyOverHowmuch), + producer.WithGroupName(r.ProducerGroupName), + ) + + if err != nil { + return err + } + + err = p.Start() + if err != nil { + return err + } + r.producer = p + return nil +} + +func (r *FlusherRocketmq) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error { + var msgs []*primitive.Message + for _, logGroup := range logGroupList { + logger.Debug(r.context.GetRuntimeContext(), "[LogGroup] topic", logGroup.Topic, "logstore", logGroup.Category, "logcount", len(logGroup.Logs), "tags", logGroup.LogTags) + logs, values, err := r.converter.ToByteStreamWithSelectedFields(logGroup, r.topicKeys) + if err != nil { + logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush kafka convert log fail, error", err) + } + for index, log := range logs.([][]byte) { + valueMap := values[index] + topic := r.Topic + if len(r.topicKeys) > 0 { + formatedTopic, err := fmtstr.FormatTopic(valueMap, r.Topic) + if err != nil { + logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush rocketmq format topic fail, error", err) + } else { + topic = *formatedTopic + } + } + + m := primitive.NewMessage( + topic, + log) + msgs = append(msgs, m) + } + } + if len(msgs) > 0 { + if r.Sync { + res, err := r.producer.SendSync(r.context.GetRuntimeContext(), msgs...) + if err != nil { + logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush rocketmq error, error", err) + return err + } + logger.Debug(r.context.GetRuntimeContext(), "success flush 2 rocketmq with [LogGroup] projectName", projectName, "logstoreName", logstoreName, "logcount", len(msgs), "res", res.String()) + } else { + err := r.producer.SendAsync(r.context.GetRuntimeContext(), + func(ctx context.Context, result *primitive.SendResult, e error) { + if e != nil { + logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush rocketmq error, error", e) + } else { + logger.Debug(r.context.GetRuntimeContext(), "success flush 2 rocketmq with [LogGroup] projectName", projectName, "logstoreName", logstoreName, "logcount", len(msgs), "res", result.String()) + } + }, + msgs...) + if err != nil { + logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush rocketmq error, error", err) + return err + } + } + } + return nil +} + +func (r *FlusherRocketmq) Validate() error { + if len(r.NameServers) == 0 { + return errors.New("no nameserver configured") + } + // check topic + if r.Topic == "" { + return errors.New("topic can't be empty") + } + + return nil +} + +type FlusherFunc func(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error + +func (r *FlusherRocketmq) getConverter() (*converter.Converter, error) { + logger.Debug(r.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", r.Convert.Protocol, + "Encoding", r.Convert.Encoding, "TagFieldsRename", r.Convert.TagFieldsRename, "ProtocolFieldsRename", r.Convert.ProtocolFieldsRename) + return converter.NewConverter(r.Convert.Protocol, r.Convert.Encoding, r.Convert.TagFieldsRename, r.Convert.ProtocolFieldsRename) +} + +func init() { + pipeline.Flushers["flusher_rocketmq"] = func() pipeline.Flusher { + f := NewFlusherRocketmq() + f.flusher = f.Flush + return f + } +} + +func (*FlusherRocketmq) SetUrgent(flag bool) { +} + +// IsReady is ready to flush +func (r *FlusherRocketmq) IsReady(projectName string, logstoreName string, logstoreKey int64) bool { + return r.producer != nil +} + +// Stop ... +func (r *FlusherRocketmq) Stop() error { + err := r.producer.Shutdown() + return err +} diff --git a/plugins/flusher/rocketmq/flusher_rocketmq_test.go b/plugins/flusher/rocketmq/flusher_rocketmq_test.go new file mode 100644 index 0000000000..27a81b8f6a --- /dev/null +++ b/plugins/flusher/rocketmq/flusher_rocketmq_test.go @@ -0,0 +1,71 @@ +// Copyright 2021 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rocketmq + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/alibaba/ilogtail/pkg/protocol" + "github.com/alibaba/ilogtail/plugins/test" + "github.com/alibaba/ilogtail/plugins/test/mock" +) + +// Invalid Test +func InvalidTestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + nameServers := []string{"30.198.0.161:9876"} + r := &FlusherRocketmq{ + NameServers: nameServers, + Topic: "accesslog", + Sync: false, + } + + // Verify that we can connect to the Kafka broker + lctx := mock.NewEmptyContext("p", "l", "c") + err := r.Init(lctx) + require.NoError(t, err) + + // Verify that we can successfully write data to the kafka broker + lgl := makeTestLogGroupList() + err = r.Flush("projectName", "logstoreName", "configName", lgl.GetLogGroupList()) + require.NoError(t, err) + _ = r.Stop() +} + +func makeTestLogGroupList() *protocol.LogGroupList { + f := map[string]string{} + lgl := &protocol.LogGroupList{ + LogGroupList: make([]*protocol.LogGroup, 0, 10), + } + for i := 1; i <= 10; i++ { + lg := &protocol.LogGroup{ + Logs: make([]*protocol.Log, 0, 10), + } + for j := 1; j <= 10; j++ { + f["group"] = strconv.Itoa(i) + f["message"] = "The message: " + strconv.Itoa(j) + l := test.CreateLogByFields(f) + lg.Logs = append(lg.Logs, l) + } + lgl.LogGroupList = append(lgl.LogGroupList, lg) + } + return lgl +}