From 7af9b26d293ef73be4fa05643e01edddcdf04c1a Mon Sep 17 00:00:00 2001 From: ngjaying Date: Thu, 20 Jul 2023 17:14:49 +0800 Subject: [PATCH] feat(sink): resend strategy (#2112) * feat(sink): support resend priority Signed-off-by: Jiyong Huang * feat(sink): support resend indicator If the tuple is resending, the resend indicator is set to true Signed-off-by: Jiyong Huang * fix(cache): remove async send to keep order Also refine the tests Signed-off-by: Jiyong Huang * docs(sink): resend policy Signed-off-by: Jiyong Huang --------- Signed-off-by: Jiyong Huang --- docs/en_US/guide/sinks/overview.md | 46 +++++- docs/zh_CN/guide/sinks/overview.md | 120 +++++++------- internal/conf/conf.go | 21 ++- internal/conf/conf_test.go | 161 ++++++++++++++++++- internal/topo/node/cache/sync_cache.go | 12 +- internal/topo/node/sink_node.go | 145 ++++++++++++----- internal/topo/node/sink_node_test.go | 207 ++++++++++++++++--------- 7 files changed, 518 insertions(+), 194 deletions(-) diff --git a/docs/en_US/guide/sinks/overview.md b/docs/en_US/guide/sinks/overview.md index 4169d147ba..d1821731d8 100644 --- a/docs/en_US/guide/sinks/overview.md +++ b/docs/en_US/guide/sinks/overview.md @@ -115,20 +115,50 @@ The storage location of the offline cache is determined by the storage configura Each sink can configure its own caching mechanism. The caching process is the same for each sink. If caching is enabled, all sink's events go through two phases: first, saving all content to the cache; then deleting the cache after receiving an ack. -- Error detection: After a failed send, sink should identify recoverable failures (network, etc.) by returning a specific error type, which will return a failed ack so that the cache can be retained. For successful sends or unrecoverable errors, a successful ack will be sent to delete the cache. -- Cache mechanism: The cache will first be kept in memory. If the memory threshold is exceeded, the later cache will be saved to disk. Once the disk cache exceeds the disk storage threshold, the cache will start to rotate, i.e. the earliest cache in memory will be discarded and the earliest cache on disk will be loaded instead. -- Resend policy: Currently the caching mechanism can only run in the default synchronous mode, where if a message is being sent, it will wait for the result of the send to continue sending the next cached data. Otherwise, when new data arrives, the first data in the cache is sent to detect network conditions. If the send is successful, all caches in memory and on disk are sent in a sequential chain. Chained sends can define a send interval to prevent message storms. +- Error detection: After a failed send, sink should identify recoverable failures (network, etc.) by returning a + specific error type, which will return a failed ack so that the cache can be retained. For successful sends or + unrecoverable errors, a successful ack will be sent to delete the cache. +- Cache mechanism: The cache will first be kept in memory. If the memory threshold is exceeded, the later cache will be + saved to disk. Once the disk cache exceeds the disk storage threshold, the cache will start to rotate, i.e. the + earliest cache in memory will be discarded and the earliest cache on disk will be loaded instead. +- Resend policy: Currently the caching mechanism can only run in the default synchronous mode, where if a message is + being sent, it will wait for the sending result to continue sending the next cached data. Otherwise, when new data + arrives, the first data in the cache is sent to detect network conditions. If the sending result is successful, all + caches in memory and on disk are sent in a sequential chain. Chained sends can define a send interval to prevent + message storms. +- Separation of normal data and retransmission data: Users can configure retransmission data and normal data to be sent + separately to different destinations. It is also possible to configure the priority of sending. For example, send + normal data with higher priority. You can even change the content of the retransmission data. For example, add a field + to the retransmission data in order to distinguish it at the receiving end. ### Configuration There are two levels of configuration for the Sink cache. A global configuration in `etc/kuiper.yaml` that defines the default behavior of all rules. There is also a rule sink level definition to override the default behavior. -- enableCache: whether to enable sink cache. cache storage configuration follows the configuration of the metadata store defined in `etc/kuiper.yaml`. -- memoryCacheThreshold: the number of messages to be cached in memory. For performance reasons, the earliest cached messages are stored in memory so that they can be resent immediately upon failure recovery. Data here can be lost due to failures such as power outages. -- maxDiskCache: The maximum number of messages to be cached on disk. The disk cache is first-in, first-out. If the disk cache is full, the earliest page of information will be loaded into the memory cache, replacing the old memory cache. -- bufferPageSize. buffer pages are units of bulk reads/writes to disk to prevent frequent IO. if the pages are not full and eKuiper crashes due to hardware or software errors, the last unwritten pages to disk will be lost. +- enableCache: whether to enable sink cache. cache storage configuration follows the configuration of the metadata store + defined in `etc/kuiper.yaml`. +- memoryCacheThreshold: the number of messages to be cached in memory. For performance reasons, the earliest cached + messages are stored in memory so that they can be resent immediately upon failure recovery. Data here can be lost due + to failures such as power outages. +- maxDiskCache: The maximum number of messages to be cached on disk. The disk cache is first-in, first-out. If the disk + cache is full, the earliest page of information will be loaded into the memory cache, replacing the old memory cache. +- bufferPageSize. buffer pages are units of bulk reads/writes to disk to prevent frequent IO. if the pages are not full + and eKuiper crashes due to hardware or software errors, the last unwritten pages to disk will be lost. - resendInterval: The time interval to resend information after failure recovery to prevent message storms. -- cleanCacheAtStop: whether to clean all caches when the rule is stopped, to prevent mass resending of expired messages when the rule is restarted. If not set to true, the in-memory cache will be stored to disk once the rule is stopped. Otherwise, the memory and disk rules will be cleared out. +- cleanCacheAtStop: whether to clean all caches when the rule is stopped, to prevent mass resending of expired messages + when the rule is restarted. If not set to true, the in-memory cache will be stored to disk once the rule is stopped. + Otherwise, the memory and disk rules will be cleared out. +- resendAlterQueue: whether to use the alternate queue when resending the cache. If set to true, the cache will be sent + to the alternate queue instead of the original queue. This will result in real-time messages and resend messages being + sent using different queues and the order of the messages will change. The following resend-related configurations + will only take effect if set to true. +- resendPriority: resend cached priority, int type, default is 0. -1 means resend real-time data first; 0 means equal + priority; 1 means resend cached data first. +- resendIndicatorField: field name of the resend cache, the field type must be a bool value. If the field is set, it + will be set to true when resending. e.g., if resendIndicatorField is `resend`, then the `resend` field will be set to + true when resending the cache. +- resendDestination: the destination to resend the cache to, which may have different meanings or support depending on + the sink. For example, the mqtt sink can send the resend data to a different topic. In the following example configuration of the rule, log sink has no cache-related options configured, so the global default configuration will be used; whereas mqtt sink performs its own caching policy configuration. diff --git a/docs/zh_CN/guide/sinks/overview.md b/docs/zh_CN/guide/sinks/overview.md index 7a51797630..7898143891 100644 --- a/docs/zh_CN/guide/sinks/overview.md +++ b/docs/zh_CN/guide/sinks/overview.md @@ -100,25 +100,79 @@ "sendSingle": true, "topic": "prefix/{{.topic}}" } - }] + } + ] } ``` -需要注意的是,上例中的 `sendSingle` 属性已设置。在默认情况下,目标接收到的是数组,使用的 jsonpath 需要采用 {{index . 0 "topic"}}。 +需要注意的是,上例中的 `sendSingle` 属性已设置。在默认情况下,目标接收到的是数组,使用的 jsonpath 需要采用 +{{index . 0 "topic"}}。 + +## 资源引用 + +像源一样,动作也支持配置复用,用户只需要在 sinks 文件夹中创建与目标动作同名的 yaml 文件并按照源一样的形式写入配置。 + +例如,针对 MQTT 动作场景, 用户可以在 sinks 目录下创建 mqtt.yaml 文件,并写入如下内容 + +```yaml +test: + qos: 1 + server: "tcp://broker.emqx.io:1883" +``` + +当用户需要 MQTT 动作时,除了采用传统的配置方式,如下所示 + +```json + { + "mqtt": { + "server": "tcp://broker.emqx.io:1883", + "topic": "devices/demo_001/messages/events/", + "protocolVersion": "3.1.1", + "qos": 1, + "clientId": "demo_001", + "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30", + "password": "SharedAccessSignature sr=*******************", + "retained": false + } +} +``` + +还可以通过 `resourceId` 引用形式,采用如下的配置 + +```json + { + "mqtt": { + "resourceId": "test", + "topic": "devices/demo_001/messages/events/", + "protocolVersion": "3.1.1", + "clientId": "demo_001", + "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30", + "password": "SharedAccessSignature sr=*******************", + "retained": false + } +} +``` ## 缓存 动作用于将处理结果发送到外部系统中,存在外部系统不可用的情况,特别是在从边到云的场景中。例如,在弱网情况下,边到云的网络连接可能会不时断开和重连。因此,动作提供了缓存功能,用于在发送错误的情况下暂存数据,并在错误恢复之后自动重发缓存数据。动作的缓存可分为内存和磁盘的两级存储。用户可配置内存缓存条数,超过上限后,新的缓存将离线存储到磁盘中。缓存将同时保存在内存和磁盘中,这样缓存的容量就变得更大了;它还将持续检测故障恢复状态,并在不重新启动规则的情况下重新发送。 -离线缓存的保存位置根据 `etc/kuiper.yaml` 里的 store 配置决定,默认为 sqlite 。如果磁盘存储是sqlite,所有的缓存将被保存到`data/cache.db`文件。每个 sink 将有一个唯一的 sqlite 表来保存缓存。缓存的计数添加到 sink 的 指标中的 buffer length 部分。 +离线缓存的保存位置根据 `etc/kuiper.yaml` 里的 store 配置决定,默认为 sqlite +。如果磁盘存储是sqlite,所有的缓存将被保存到`data/cache.db`文件。每个 sink 将有一个唯一的 sqlite 表来保存缓存。缓存的计数添加到 +sink 的 指标中的 buffer length 部分。 ### 流程 -每个 sink 都可以配置自己的缓存机制。每个 sink 的缓存流程是相同的。如果启用了缓存,所有 sink 的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到 ack 后删除缓存。 +每个 sink 都可以配置自己的缓存机制。每个 sink 的缓存流程是相同的。如果启用了缓存,所有 sink +的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到 ack 后删除缓存。 -- 错误检测:发送失败后,sink应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的ack,这样缓存就可以被保留下来。对于成功的发送或不可恢复的错误,将发送一个成功的 ack 来删除缓存。 -- 缓存机制:缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始 rotate,即内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。 -- 重发策略:目前缓存机制仅可运行在默认的同步模式中,如果有一条消息正在发送中,则会等待发送的结果以继续发送下个缓存数据。否则,当有新的数据到来时,发送缓存中的第一个数据以检测网络状况。如果发送成功,将按顺序链式发送所有内存和磁盘中的所有缓存。链式发送可定义一个发送间隔,防止形成消息风暴。 +- 错误检测:发送失败后,sink应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的ack,这样缓存就可以被保留下来。对于成功的发送或不可恢复的错误,将发送一个成功的 + ack 来删除缓存。 +- 缓存机制:缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始 + rotate,即内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。 +- +重发策略:如果有一条消息正在发送中,则会等待发送的结果以继续发送下个缓存数据。否则,当有新的数据到来时,发送缓存中的第一个数据以检测网络状况。如果发送成功,将按顺序链式发送所有内存和磁盘中的所有缓存。链式发送可定义一个发送间隔,防止形成消息风暴。 +- 实时数据和重发数据区分:用户可配置重发数据与实时数据分开发送,分别发送到不同的目的地。也可配置发送的优先级,优先发送重发数据或实时数据。甚至可以更改发送的内容,例如,将重发数据的增加一个字段,以便在接收端进行区分。 ### 配置 @@ -130,6 +184,13 @@ Sink 缓存的配置有两个层次。`etc/kuiper.yaml` 中的全局配置,定 - bufferPageSize:缓冲页是批量读/写到磁盘的单位,以防止频繁的IO。如果页面未满,eKuiper 因硬件或软件错误而崩溃,最后未写入磁盘的页面将被丢失。 - resendInterval:故障恢复后重新发送信息的时间间隔,防止信息风暴。 - cleanCacheAtStop:是否在规则停止时清理所有缓存,以防止规则重新启动时对过期消息进行大量重发。如果不设置为true,一旦规则停止,内存缓存将被存储到磁盘中。否则,内存和磁盘规则会被清理掉。 +- resendAlterQueue:是否在重新发送缓存时使用备用队列。如果设置为true,缓存将被发送到备用队列,而不是原始队列。这将导致实时消息和重发消息使用不同的队列发送,消息的顺序发生变化,但是可以防止消息风暴。只有设置为 + true 时,以下 resend 相关配置才能生效。 +- resendPriority: 重新发送缓存的优先级,int 类型,默认为 0。-1 表示优先发送实时数据;0 表示同等优先级;1 表示优先发送缓存数据。 +- resendIndicatorField:重新发送缓存的字段名,该字段类型必须是 bool 值。如果设置了字段,重发时将设置为 + true。例如,resendIndicatorField 为 resend,那么在重新发送缓存时,将会将 resend 字段设置为 true。 +- resendDestination:重新发送缓存的目的地,根据不同的 sink 可能有不同的意义或支持。例如,mqtt sink 可以将重发数据发送到不同的 + topic。 在以下规则的示例配置中,log sink 没有配置缓存相关选项,因此将会采用全局默认配置;而 mqtt sink 进行了自身缓存策略的配置。 @@ -152,48 +213,3 @@ Sink 缓存的配置有两个层次。`etc/kuiper.yaml` 中的全局配置,定 }] } ``` - -## 资源引用 - -像源一样,动作也支持配置复用,用户只需要在 sinks 文件夹中创建与目标动作同名的 yaml 文件并按照源一样的形式写入配置。 - -例如,针对 MQTT 动作场景, 用户可以在 sinks 目录下创建 mqtt.yaml 文件,并写入如下内容 - -```yaml -test: - qos: 1 - server: "tcp://broker.emqx.io:1883" -``` - -当用户需要 MQTT 动作时,除了采用传统的配置方式,如下所示 - -```json - { - "mqtt": { - "server": "tcp://broker.emqx.io:1883", - "topic": "devices/demo_001/messages/events/", - "protocolVersion": "3.1.1", - "qos": 1, - "clientId": "demo_001", - "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30", - "password": "SharedAccessSignature sr=*******************", - "retained": false - } - } -``` - -还可以通过 `resourceId` 引用形式,采用如下的配置 - -```json - { - "mqtt": { - "resourceId": "test", - "topic": "devices/demo_001/messages/events/", - "protocolVersion": "3.1.1", - "clientId": "demo_001", - "username": "xyz.azure-devices.net/demo_001/?api-version=2018-06-30", - "password": "SharedAccessSignature sr=*******************", - "retained": false - } -} -``` diff --git a/internal/conf/conf.go b/internal/conf/conf.go index cb88d0c895..f3cbabd386 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -43,13 +43,15 @@ type tlsConf struct { } type SinkConf struct { - MemoryCacheThreshold int `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"` - MaxDiskCache int `json:"maxDiskCache" yaml:"maxDiskCache"` - BufferPageSize int `json:"bufferPageSize" yaml:"bufferPageSize"` - EnableCache bool `json:"enableCache" yaml:"enableCache"` - ResendInterval int `json:"resendInterval" yaml:"resendInterval"` - CleanCacheAtStop bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"` - ResendAlterQueue bool `json:"resendAlterQueue" yaml:"resendAlterQueue"` + MemoryCacheThreshold int `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"` + MaxDiskCache int `json:"maxDiskCache" yaml:"maxDiskCache"` + BufferPageSize int `json:"bufferPageSize" yaml:"bufferPageSize"` + EnableCache bool `json:"enableCache" yaml:"enableCache"` + ResendInterval int `json:"resendInterval" yaml:"resendInterval"` + CleanCacheAtStop bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"` + ResendAlterQueue bool `json:"resendAlterQueue" yaml:"resendAlterQueue"` + ResendPriority int `json:"resendPriority" yaml:"resendPriority"` + ResendIndicatorField string `json:"resendIndicatorField" yaml:"resendIndicatorField"` } // Validate the configuration and reset to the default value for invalid values. @@ -95,6 +97,11 @@ func (sc *SinkConf) Validate() error { Log.Warnf("maxDiskCache is not a multiple of bufferPageSize, set to %d", sc.MaxDiskCache) errs = errors.Join(errs, errors.New("maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize")) } + if sc.ResendPriority < -1 || sc.ResendPriority > 1 { + sc.ResendPriority = 0 + Log.Warnf("resendPriority is not in [-1, 1], set to 0") + errs = errors.Join(errs, errors.New("resendPriority:resendPriority must be -1, 0 or 1")) + } return errs } diff --git a/internal/conf/conf_test.go b/internal/conf/conf_test.go index 8dcb5a6e38..99eb616a7c 100644 --- a/internal/conf/conf_test.go +++ b/internal/conf/conf_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 EMQ Technologies Co., Ltd. +// Copyright 2022-2023 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,10 +14,13 @@ package conf import ( + "errors" "fmt" "reflect" "testing" + "github.com/stretchr/testify/assert" + "github.com/lf-edge/ekuiper/pkg/api" ) @@ -225,3 +228,159 @@ func TestRuleOptionValidate(t *testing.T) { } } } + +func TestSinkConf_Validate(t *testing.T) { + tests := []struct { + name string + sc SinkConf + wantErr error + }{ + { + name: "valid config", + sc: SinkConf{ + MemoryCacheThreshold: 1024, + MaxDiskCache: 1024000, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: nil, + }, + { + name: "invalid memoryCacheThreshold", + sc: SinkConf{ + MemoryCacheThreshold: -1, + MaxDiskCache: 1024000, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("memoryCacheThreshold:memoryCacheThreshold must be positive")), + }, + { + name: "invalid maxDiskCache", + sc: SinkConf{ + MemoryCacheThreshold: 1024, + MaxDiskCache: -1, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("maxDiskCache:maxDiskCache must be positive")), + }, + { + name: "invalid bufferPageSize", + sc: SinkConf{ + MemoryCacheThreshold: 1024, + MaxDiskCache: 1024000, + BufferPageSize: 0, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("bufferPageSize:bufferPageSize must be positive")), + }, + { + name: "invalid resendInterval", + sc: SinkConf{ + MemoryCacheThreshold: 1024, + MaxDiskCache: 1024000, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: -1, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("resendInterval:resendInterval must be positive")), + }, + { + name: "memoryCacheThresholdTooSmall", + sc: SinkConf{ + MemoryCacheThreshold: 128, + MaxDiskCache: 1024000, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("memoryCacheThresholdTooSmall:memoryCacheThreshold must be greater than or equal to bufferPageSize")), + }, + { + name: "memoryCacheThresholdNotMultiple", + sc: SinkConf{ + MemoryCacheThreshold: 300, + MaxDiskCache: 1024000, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("memoryCacheThresholdNotMultiple:memoryCacheThreshold must be a multiple of bufferPageSize")), + }, + { + name: "maxDiskCacheTooSmall", + sc: SinkConf{ + MemoryCacheThreshold: 1024, + MaxDiskCache: 128, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("maxDiskCacheTooSmall:maxDiskCache must be greater than bufferPageSize")), + }, + { + name: "maxDiskCacheNotMultiple", + sc: SinkConf{ + MemoryCacheThreshold: 1024, + MaxDiskCache: 300, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 0, + }, + wantErr: errors.Join(errors.New("maxDiskCacheNotMultiple:maxDiskCache must be a multiple of bufferPageSize")), + }, + { + name: "invalid resendPriority", + sc: SinkConf{ + MemoryCacheThreshold: 1024, + MaxDiskCache: 1024000, + BufferPageSize: 256, + EnableCache: true, + ResendInterval: 0, + CleanCacheAtStop: true, + ResendAlterQueue: true, + ResendPriority: 2, + }, + wantErr: errors.Join(errors.New("resendPriority:resendPriority must be -1, 0 or 1")), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.sc.Validate() + assert.Equal(t, tt.wantErr, err) + }) + } +} diff --git a/internal/topo/node/cache/sync_cache.go b/internal/topo/node/cache/sync_cache.go index 70b6bf7e3b..60d210a70f 100644 --- a/internal/topo/node/cache/sync_cache.go +++ b/internal/topo/node/cache/sync_cache.go @@ -160,15 +160,11 @@ func (c *SyncCache) run(ctx api.StreamContext) { select { case item := <-c.in: ctx.GetLogger().Debugf("send to cache") - go func() { // avoid deadlock when cacheCtrl is full - c.cacheCtrl <- item - }() + c.cacheCtrl <- item case isSuccess := <-c.Ack: // only send the next sink after receiving an ack ctx.GetLogger().Debugf("cache ack") - go func() { - c.cacheCtrl <- AckResult(isSuccess) - }() + c.cacheCtrl <- AckResult(isSuccess) case data := <-c.cacheCtrl: // The only place to manipulate cache switch r := data.(type) { case AckResult: @@ -213,7 +209,7 @@ func (c *SyncCache) send(ctx api.StreamContext) { } d, ok := c.peakMemCache(ctx) if ok { - ctx.GetLogger().Debugf("sending cache item %v", d) + ctx.GetLogger().Infof("sending cache item %v", d) c.sendStatus = 1 ctx.GetLogger().Debug("send status to 0 after sending tuple") select { @@ -264,7 +260,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{ ctx.GetLogger().Debugf("added cache to disk buffer page %v", c.diskBufferPage) } } else { - ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache) + ctx.GetLogger().Infof("added cache to mem cache %v", item) } c.CacheLength++ ctx.GetLogger().Debugf("added cache %d", c.CacheLength) diff --git a/internal/topo/node/sink_node.go b/internal/topo/node/sink_node.go index c616b2dc89..5c4c90d06e 100644 --- a/internal/topo/node/sink_node.go +++ b/internal/topo/node/sink_node.go @@ -217,7 +217,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) { outs := itemToMap(data) if sconf.Omitempty && (data == nil || len(outs) == 0) { ctx.GetLogger().Debugf("receive empty in sink") - return nil + break } select { case dataCh <- outs: @@ -244,64 +244,123 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) { } else { resendCh := make(chan []map[string]interface{}, sconf.BufferLength) rq := cache.NewSyncCache(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength) - for { + receiveQ := func(data interface{}) { + processed := false + if data, processed = m.preprocess(data); processed { + return + } + stats.IncTotalRecordsIn() + stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength)) + outs := itemToMap(data) + if sconf.Omitempty && (data == nil || len(outs) == 0) { + ctx.GetLogger().Debugf("receive empty in sink") + return + } select { - case data := <-m.input: - processed := false - if data, processed = m.preprocess(data); processed { - break + case dataCh <- outs: + case <-ctx.Done(): + } + select { + case resendCh <- nil: + case <-ctx.Done(): + } + } + normalQ := func(data []map[string]interface{}) { + stats.ProcessTimeStart() + stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength)) + ctx.GetLogger().Debugf("sending data: %v", data) + err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, false) + ack := checkAck(ctx, data, err) + // If ack is false, add it to the resend queue + if !ack { + select { + case resendCh <- data: + case <-ctx.Done(): } - stats.IncTotalRecordsIn() - stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength)) - outs := itemToMap(data) - if sconf.Omitempty && (data == nil || len(outs) == 0) { - ctx.GetLogger().Debugf("receive empty in sink") - return nil + } + // Always ack for the normal queue as fail items are handled by the resend queue + select { + case c.Ack <- true: + case <-ctx.Done(): + } + stats.ProcessTimeEnd() + } + resendQ := func(data []map[string]interface{}) { + ctx.GetLogger().Debugf("resend data: %v", data) + stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength)) + if sconf.ResendIndicatorField != "" { + for _, item := range data { + item[sconf.ResendIndicatorField] = true } + } + err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, true) + ack := checkAck(ctx, data, err) + select { + case rq.Ack <- ack: + case <-ctx.Done(): + } + } + doneQ := func() { + logger.Infof("sink node %s instance %d done", m.name, instance) + if err := sink.Close(ctx); err != nil { + logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err) + } + } + + if sconf.ResendPriority == 0 { + for { select { - case dataCh <- outs: + case data := <-m.input: + receiveQ(data) + case data := <-c.Out: + normalQ(data) + case data := <-rq.Out: + resendQ(data) case <-ctx.Done(): + doneQ() + return nil } + } + } else if sconf.ResendPriority < 0 { // normal queue has higher priority + for { select { - case resendCh <- nil: + case data := <-m.input: + receiveQ(data) + case data := <-c.Out: + normalQ(data) case <-ctx.Done(): - } - case data := <-c.Out: - stats.ProcessTimeStart() - stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength)) - ctx.GetLogger().Debugf("sending data: %v", data) - err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, false) - ack := checkAck(ctx, data, err) - // If ack is false, add it to the resend queue - if !ack { + doneQ() + return nil + default: select { - case resendCh <- data: - case <-ctx.Done(): + case data := <-c.Out: + normalQ(data) + case data := <-rq.Out: + resendQ(data) } } - // Always ack for the normal queue as fail items are handled by the resend queue - select { - case c.Ack <- true: - case <-ctx.Done(): - } - stats.ProcessTimeEnd() - case data := <-rq.Out: - ctx.GetLogger().Debugf("resend data: %v", data) - stats.SetBufferLength(int64(len(dataCh) + c.CacheLength + rq.CacheLength)) - err := doCollectMaps(ctx, sink, sconf, data, sendManager, stats, true) - ack := checkAck(ctx, data, err) + } + } else { + for { select { - case rq.Ack <- ack: + case data := <-m.input: + receiveQ(data) + case data := <-rq.Out: + resendQ(data) case <-ctx.Done(): + doneQ() + return nil + default: + select { + case data := <-c.Out: + normalQ(data) + case data := <-rq.Out: + resendQ(data) + } } - case <-ctx.Done(): - logger.Infof("sink node %s instance %d done", m.name, instance) - if err := sink.Close(ctx); err != nil { - logger.Warnf("close sink node %s instance %d fails: %v", m.name, instance, err) - } - return nil } } + } } }) diff --git a/internal/topo/node/sink_node_test.go b/internal/topo/node/sink_node_test.go index 64ed41cb38..407ba4488f 100644 --- a/internal/topo/node/sink_node_test.go +++ b/internal/topo/node/sink_node_test.go @@ -634,83 +634,140 @@ func TestSinkCache(t *testing.T) { contextLogger := conf.Log.WithField("rule", "TestSinkCache") ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger) - data := [][]map[string]interface{}{ - {{"a": 1}}, - {{"a": 2}}, - {{"a": 3}}, - {{"a": 4}}, - {{"a": 5}}, - {{"a": 6}}, - {{"a": 7}}, - {{"a": 8}}, - {{"a": 9}}, - {{"a": 10}}, + + tests := []struct { + name string + config map[string]interface{} + result [][]byte + resendResult [][]byte + }{ + { + name: "test cache", + config: map[string]interface{}{ + "enableCache": true, + }, + result: [][]byte{ + []byte(`[{"a":1}]`), + []byte(`[{"a":2}]`), + []byte(`[{"a":3}]`), + []byte(`[{"a":4}]`), + []byte(`[{"a":5}]`), + []byte(`[{"a":6}]`), + []byte(`[{"a":7}]`), + []byte(`[{"a":8}]`), + []byte(`[{"a":9}]`), + []byte(`[{"a":10}]`), + }, + }, + { + name: "test resend", + config: map[string]interface{}{ + "enableCache": true, + "resendAlterQueue": true, + }, + result: [][]byte{ + []byte(`[{"a":2}]`), + []byte(`[{"a":4}]`), + []byte(`[{"a":6}]`), + []byte(`[{"a":8}]`), + []byte(`[{"a":10}]`), + }, + resendResult: [][]byte{ + []byte(`[{"a":1}]`), + []byte(`[{"a":3}]`), + []byte(`[{"a":5}]`), + }, + }, + { + name: "test resend priority low", + config: map[string]interface{}{ + "enableCache": true, + "resendAlterQueue": true, + "resendPriority": -1, + "resendIndicatorField": "isResend", + }, + result: [][]byte{ + []byte(`[{"a":2}]`), + []byte(`[{"a":4}]`), + []byte(`[{"a":6}]`), + []byte(`[{"a":8}]`), + []byte(`[{"a":10}]`), + }, + resendResult: [][]byte{ + []byte(`[{"a":1,"isResend":true}]`), + []byte(`[{"a":3,"isResend":true}]`), + []byte(`[{"a":5,"isResend":true}]`), + }, + }, + { + name: "test resend priority high", + config: map[string]interface{}{ + "enableCache": true, + "resendAlterQueue": true, + "resendPriority": 1, + }, + result: [][]byte{ + []byte(`[{"a":2}]`), + []byte(`[{"a":4}]`), + []byte(`[{"a":6}]`), + []byte(`[{"a":8}]`), + []byte(`[{"a":10}]`), + }, + resendResult: [][]byte{ + []byte(`[{"a":1}]`), + []byte(`[{"a":3}]`), + []byte(`[{"a":5}]`), + }, + }, } - t.Run("test cache", func(t *testing.T) { - hitch := make(chan int, 10) - config := map[string]interface{}{ - "enableCache": true, - } - result := [][]byte{ - []byte(`[{"a":1}]`), - []byte(`[{"a":2}]`), - []byte(`[{"a":3}]`), - []byte(`[{"a":4}]`), - []byte(`[{"a":5}]`), - []byte(`[{"a":6}]`), - []byte(`[{"a":7}]`), - []byte(`[{"a":8}]`), - []byte(`[{"a":9}]`), - []byte(`[{"a":10}]`), - } - mockSink := mocknode.NewMockResendSink(hitch) - s := NewSinkNodeWithSink("mockSink", mockSink, config) - s.Open(ctx, make(chan error)) - for i := 0; i < 200; i++ { - s.input <- data[i%10] - select { - case count := <-hitch: - if count == len(data)*2 { - goto end + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data := [][]map[string]interface{}{ + {{"a": 1}}, + {{"a": 2}}, + {{"a": 3}}, + {{"a": 4}}, + {{"a": 5}}, + {{"a": 6}}, + {{"a": 7}}, + {{"a": 8}}, + {{"a": 9}}, + {{"a": 10}}, + } + hitch := make(chan int, 10) + mockSink := mocknode.NewMockResendSink(hitch) + fmt.Printf("mockSink: %+v\n", tt.config) + s := NewSinkNodeWithSink("mockSink", mockSink, tt.config) + s.Open(ctx, make(chan error)) + for i := 0; i < 200; i++ { + s.input <- data[i%len(data)] + select { + case <-hitch: + done := true + results := mockSink.GetResults() + if len(results) != len(tt.result) { + done = false + } + if done && tt.resendResult != nil { + resentResults := mockSink.GetResendResults() + if len(resentResults) != len(tt.resendResult) { + done = false + } + } + if done { + goto end + } + case <-time.After(1 * time.Second): } - case <-time.After(1 * time.Second): } - } - end: - results := mockSink.GetResults() - assert.Equal(t, result, results) - }) - - t.Run("test resend cache", func(t *testing.T) { - hitch := make(chan int, 10) - config := map[string]interface{}{ - "enableCache": true, - "resendAlterQueue": true, - } - result := [][]byte{ - []byte(`[{"a":2}]`), - []byte(`[{"a":4}]`), - []byte(`[{"a":6}]`), - []byte(`[{"a":8}]`), - []byte(`[{"a":10}]`), - } - resendResult := [][]byte{ - []byte(`[{"a":1}]`), - []byte(`[{"a":3}]`), - []byte(`[{"a":5}]`), - } - mockSink := mocknode.NewMockResendSink(hitch) - s := NewSinkNodeWithSink("mockSink", mockSink, config) - s.Open(ctx, make(chan error)) - for _, d := range data { - s.input <- d - <-hitch - } - time.Sleep(1 * time.Second) - results := mockSink.GetResults() - assert.Equal(t, results, result) - resentResults := mockSink.GetResendResults() - assert.Equal(t, resendResult, resentResults[:3]) - }) + end: + results := mockSink.GetResults() + assert.Equal(t, results[:len(tt.result)], tt.result) + if tt.resendResult != nil { + resentResults := mockSink.GetResendResults() + assert.Equal(t, resentResults[:len(tt.resendResult)], tt.resendResult) + } + }) + } }