-
Notifications
You must be signed in to change notification settings - Fork 394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add rocketmq flusher #850
base: main
Are you sure you want to change the base?
add rocketmq flusher #850
Conversation
伴农 seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
很高效👍 |
CI的问题需要处理下 |
} | ||
|
||
type FlusherRocketmq struct { | ||
context pipeline.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlusherRocketmq这个结构体属性建议把面向用户配置和内部逻辑使用的分开,不要混合在一起,大写和小写命名的可以参考其它flusher 分割下
} | ||
for index, log := range logs.([][]byte) { | ||
valueMap := values[index] | ||
topic, err := fmtstr.FormatTopic(valueMap, r.Topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里参考当前flusher pulsar的逻辑优化下,大部分场景不会触发动态topic,FormatTopic有一定的性能损耗,如果静态topic就不要调用FormatTopic
https://github.com/alibaba/ilogtail/blob/main/licenses/LICENSE_OF_ILOGTAIL_DEPENDENCIES.md https://github.com/alibaba/ilogtail/blob/main/CHANGELOG.md Unreleased里添加下rocketmq flusher的新增日志 https://github.com/alibaba/ilogtail/tree/main/docs/cn/data-pipeline/flusher 下新增一个rocketmq flusher的文档 |
"context" | ||
"errors" | ||
"fmt" | ||
"github.com/alibaba/ilogtail/pkg/fmtstr" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import这块按照golang的规范格式化下
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all right, thanks bro, first time writing golang, I will fix above issues later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
需要加一下e2e测试,详情见doc
|----------------------------|--------|------|-------------------------------------------------------------| | ||
| NameServers | String | 是 | name server address | | ||
| Topic | String | 是 | rocketmq Topic,支持动态topic, 例如: `test_%{content.appname}` | | ||
| Sync | bool | 否 | 默认为异常发送 | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
异常发送?异步?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
异步
|
||
| 参数 | 类型 | 是否必选 | 说明 | | ||
|----------------------------|--------|------|-------------------------------------------------------------| | ||
| NameServers | String | 是 | name server address | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
名字和其它插件统一一下吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
你是说要改成Brokers么,kafka配置里写的是brokers的地址,rocketmq是需要填namesever地址,然后通过nameserver找到broker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个保留和中间件本身的术语应该OK
r.context = context | ||
if r.NameServers == nil || len(r.NameServers) == 0 { | ||
var err = errors.New("name server is nil") | ||
logger.Error(r.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init rocketmq flusher fail, eerror", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo,以及可以在外层打印错误日志
return nil | ||
} | ||
|
||
type FlusherFunc func(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个除了这种flush模式外,还有其它模式可能吗?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
还有单条的,这次没加,等后续再加
licenses/LICENCE-rocketmq
Outdated
@@ -0,0 +1,201 @@ | |||
Apache License | |||
Version 2.0, January 2004 | |||
http://www.apache.org/licenses/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不是让添加rocketmq的license文件,可以看下LICENSE_OF_ILOGTAIL_DEPENDENCIES.md这个文件里配置,把本次pr新添加的依赖库的license加上
for index, log := range logs.([][]byte) { | ||
valueMap := values[index] | ||
topic := r.Topic | ||
if len(r.topicKeys) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicKeys这个定义了并未看到实际有用的逻辑?是不是可以参靠flusher pulsar和kafka v2实现下
86f76a6
to
00d6e7e
Compare
logger.Debug(r.context.GetRuntimeContext(), "[LogGroup] topic", logGroup.Topic, "logstore", logGroup.Category, "logcount", len(logGroup.Logs), "tags", logGroup.LogTags) | ||
logs, values, err := r.converter.ToByteStreamWithSelectedFields(logGroup, r.topicKeys) | ||
if err != nil { | ||
logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush kafka convert log fail, error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush kafka 这个错误提示message需要改下
logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush rocketmq error, error", err) | ||
return err | ||
} | ||
logger.Debug(r.context.GetRuntimeContext(), "success flush 2 rocketmq with [LogGroup] projectName", projectName, "logstoreName", logstoreName, "logcount", len(msgs), "res", res.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to use 'to' instead of 2
If "conversation" has been resolved, please mark it as resolved |
add rocketmq flusher