From 9302b937461acef2fba8db76bb23c7649df5b088 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Fri, 10 Jan 2025 08:50:26 +0800 Subject: [PATCH] feat(stream)[TS-5469]. make sink task send notify event --- include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 2 + source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/src/inc/tq.h | 5 + source/dnode/vnode/src/inc/vnodeInt.h | 5 + source/dnode/vnode/src/tq/tqSink.c | 6 + source/dnode/vnode/src/tq/tqStreamNotify.c | 355 +++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeOpen.c | 10 + source/libs/executor/src/streamexecutorInt.c | 14 +- source/libs/stream/src/streamTask.c | 14 +- 10 files changed, 404 insertions(+), 9 deletions(-) create mode 100644 source/dnode/vnode/src/tq/tqStreamNotify.c diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8d44e7487b7b..1a5048ed3342 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -435,6 +435,7 @@ typedef struct SNotifyInfo { SArray* pNotifyAddrUrls; int32_t notifyEventTypes; int32_t notifyErrorHandle; + char* streamName; } SNotifyInfo; struct SStreamTask { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b95d5e315a6f..c5f79de6c98f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -766,6 +766,8 @@ static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, SStr TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno); pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes; pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle; + pTask->notifyInfo.streamName = taosStrdup(createReq->name); + TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno); _end: if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8f63cc87798c..b90e1844ae70 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -75,6 +75,7 @@ set( "src/tq/tqSnapshot.c" "src/tq/tqStreamStateSnap.c" "src/tq/tqStreamTaskSnap.c" + "src/tq/tqStreamNotify.c" ) aux_source_directory("src/tsdb/" TSDB_SOURCE_FILES) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 28a0d1175761..9f294f2d8503 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -160,6 +160,11 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq); int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type); +// tq send notifications +int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap); +void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap); +int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode); + #define TQ_ERR_GO_TO_END(c) \ do { \ code = c; \ diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 940116317c59..02c3b3ebe092 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -81,6 +81,8 @@ typedef struct SCommitInfo SCommitInfo; typedef struct SCompactInfo SCompactInfo; typedef struct SQueryNode SQueryNode; +typedef struct SStreamNotifyHandleMap SStreamNotifyHandleMap; + #define VNODE_META_TMP_DIR "meta.tmp" #define VNODE_META_BACKUP_DIR "meta.backup" @@ -496,6 +498,9 @@ struct SVnode { int64_t blockSeq; SQHandle* pQuery; SVMonitorObj monitor; + + // Notification Handles + SStreamNotifyHandleMap* pNotifyHandleMap; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7c085ea31a43..4738ec5957a1 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -1150,6 +1150,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { return; } + code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId: %d, s-task:%s failed to send all event notifications"); + // continue processing even if notification fails + } + bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks); if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) { tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, diff --git a/source/dnode/vnode/src/tq/tqStreamNotify.c b/source/dnode/vnode/src/tq/tqStreamNotify.c new file mode 100644 index 000000000000..2cdbc64e35e5 --- /dev/null +++ b/source/dnode/vnode/src/tq/tqStreamNotify.c @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "cmdnodes.h" +#include "tq.h" + +typedef struct SStreamNotifyHandle { + TdThreadMutex mutex; + char* url; +} SStreamNotifyHandle; + +struct SStreamNotifyHandleMap { + TdThreadMutex gMutex; + SHashObj *handleMap; +}; + +static void destroyStreamNotifyHandle(void *ptr) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamNotifyHandle** ppHandle = ptr; + + if (ppHandle == NULL || *ppHandle == NULL) { + return; + } + code = taosThreadMutexDestroy(&(*ppHandle)->mutex); + taosMemoryFreeClear((*ppHandle)->url); +} + +static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) { + if (ppHandle == NULL || *ppHandle == NULL) { + return; + } + (void)taosThreadMutexUnlock(&(*ppHandle)->mutex); + *ppHandle = NULL; +} + +static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url, SStreamNotifyHandle** ppHandle) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + bool gLocked = false; + SStreamNotifyHandle** ppFindHandle = NULL; + SStreamNotifyHandle* pNewHandle = NULL; + + TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *ppHandle = NULL; + + code = taosThreadMutexLock(&pMap->gMutex); + TSDB_CHECK_CODE(code, lino, _end); + gLocked = true; + + ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url)); + if (ppFindHandle == NULL) { + pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle)); + TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno); + code = taosThreadMutexInit(&pNewHandle->mutex, NULL); + TSDB_CHECK_CODE(code, lino, _end); + code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES); + TSDB_CHECK_CODE(code, lino, _end); + *ppHandle = pNewHandle; + pNewHandle = NULL; + } else { + *ppHandle = *ppFindHandle; + } + + code = taosThreadMutexLock(&(*ppHandle)->mutex); + TSDB_CHECK_CODE(code, lino, _end); + + (void)taosThreadMutexUnlock(&pMap->gMutex); + gLocked = false; + + if ((*ppHandle)->url == NULL) { + (*ppHandle)->url = taosStrdup(url); + TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + if (*ppHandle) { + releaseStreamNotifyHandle(ppHandle); + } + } + if (pNewHandle) { + destroyStreamNotifyHandle(&pNewHandle); + } + if (gLocked) { + (void)taosThreadMutexUnlock(&pMap->gMutex); + } + return code; +} + +int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamNotifyHandleMap* pMap = NULL; + + TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *ppMap = NULL; + pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap)); + TSDB_CHECK_NULL(pMap, code, lino, _end, terrno); + code = taosThreadMutexInit(&pMap->gMutex, NULL); + TSDB_CHECK_CODE(code, lino, _end); + pMap->handleMap = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno); + taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle); + *ppMap = pMap; + pMap = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (pMap != NULL) { + tqDestroyNotifyHandleMap(&pMap); + } + return code; +} + +void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + if (*ppMap == NULL) { + return; + } + taosHashCleanup((*ppMap)->handleMap); + code = taosThreadMutexDestroy(&(*ppMap)->gMutex); + taosMemoryFreeClear((*ppMap)); +} + +#define JSON_CHECK_ADD_ITEM(obj, str, item) \ + TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY) + +static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + cJSON* obj = NULL; + cJSON* streams = NULL; + cJSON* stream = NULL; + char msgId[37]; + + TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *pHeader = NULL; + + code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId)); + TSDB_CHECK_CODE(code, lino, _end); + + stream = cJSON_CreateObject(); + TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName)); + JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray()); + + streams = cJSON_CreateArray(); + TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY) + stream = NULL; + + obj = cJSON_CreateObject(); + TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId)); + JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs())); + JSON_CHECK_ADD_ITEM(obj, "streams", streams); + streams = NULL; + + *pHeader = cJSON_PrintUnformatted(obj); + TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (stream != NULL) { + cJSON_Delete(stream); + } + if (streams != NULL) { + cJSON_Delete(streams); + } + if (obj != NULL) { + cJSON_Delete(obj); + } + return code; +} + +static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t numOfBlocks = 0; + int32_t msgHeaderLen = 0; + int32_t msgTailLen = 0; + int32_t msgLen = 0; + char* msgHeader = NULL; + const char* msgTail = "]}]}"; + char* msg = NULL; + + TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA); + + *pMsg = NULL; + numOfBlocks = taosArrayGetSize(pBlocks); + + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) { + continue; + } + + SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX); + for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { + char* val = colDataGetVarData(pEventStrCol, j); + msgLen += varDataLen(val) + 1; + } + } + + if (msgLen == 0) { + // skip since no notification events found + goto _end; + } + + code = getStreamNotifyEventHeader(streamName, &msgHeader); + TSDB_CHECK_CODE(code, lino, _end); + msgHeaderLen = strlen(msgHeader); + msgTailLen = strlen(msgTail); + msgLen += msgHeaderLen; + + msg = taosMemoryMalloc(msgLen); + TSDB_CHECK_NULL(msg, code, lino, _end, terrno); + char* p = msg; + TAOS_STRNCPY(p, msgHeader, msgHeaderLen); + p += msgHeaderLen - msgTailLen; + + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) { + continue; + } + + SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX); + for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { + char* val = colDataGetVarData(pEventStrCol, j); + TAOS_STRNCPY(p, varDataVal(val), varDataLen(val)); + p += varDataLen(val); + *(p++) = ','; + } + } + + p -= 1; + TAOS_STRNCPY(p, msgTail, msgTailLen); + *(p + msgTailLen) = '\0'; + + *pMsg = msg; + msg = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (msgHeader != NULL) { + cJSON_free(msgHeader); + } + if (msg != NULL) { + taosMemoryFreeClear(msg); + } + return code; +} + +static int32_t sendSingleStreamNotify(SStreamNotifyHandle *pHandle, const char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA); + + tqWarn("%s <= %s", pHandle->url, msg); + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + char* msg = NULL; + int32_t nNotifyAddr = 0; + SStreamNotifyHandle* pHandle = NULL; + + TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA); + + nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls); + if (nNotifyAddr == 0) { + goto _end; + } + + code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg); + TSDB_CHECK_CODE(code, lino, _end); + if (msg == NULL) { + goto _end; + } + + for (int32_t i = 0; i < nNotifyAddr; ++i) { + const char * url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i); + code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle); + if (code != TSDB_CODE_SUCCESS) { + tqError("failed to get stream notify handle of %s", url); + if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) { + // retry for event message sending in PAUSE error handling mode + --i; + continue; + } else { + // simply ignore the failure in DROP error handling mode + code = TSDB_CODE_SUCCESS; + continue; + } + } + code = sendSingleStreamNotify(pHandle, msg); + if (code != TSDB_CODE_SUCCESS) { + tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code)); + if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) { + // retry for event message sending in PAUSE error handling mode + --i; + } else { + // simply ignore the failure in DROP error handling mode + code = TSDB_CODE_SUCCESS; + } + } + releaseStreamNotifyHandle(&pHandle); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (msg) { + taosMemoryFreeClear(msg); + } + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 6de5298728ca..280ee527f751 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -15,6 +15,7 @@ #include "sync.h" #include "tcs.h" +#include "tq.h" #include "tsdb.h" #include "vnd.h" @@ -483,6 +484,14 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC ret = taosRealPath(tdir, NULL, sizeof(tdir)); TAOS_UNUSED(ret); + // init handle map for stream event notification + ret = tqInitNotifyHandleMap(&pVnode->pNotifyHandleMap); + if (ret != TSDB_CODE_SUCCESS) { + vError("vgId:%d, failed to init StreamNotifyHandleMap", TD_VID(pVnode)); + terrno = ret; + goto _err; + } + // open query vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode)); if (vnodeQueryOpen(pVnode)) { @@ -555,6 +564,7 @@ void vnodeClose(SVnode *pVnode) { vnodeAWait(&pVnode->commitTask); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); + tqDestroyNotifyHandleMap(&pVnode->pNotifyHandleMap); tqClose(pVnode->pTq); walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c index 85cbdb08014b..f50e10b357eb 100644 --- a/source/libs/executor/src/streamexecutorInt.c +++ b/source/libs/executor/src/streamexecutorInt.c @@ -45,7 +45,7 @@ static void destroyStreamWindowEvent(void* ptr) { cJSON_free(pEvent->content); } -static void destroyStreamWindowEventSupp(SStreamNotifyEventSupp* sup) { +static void destroyStreamNotifyEventSupp(SStreamNotifyEventSupp* sup) { if (sup == NULL) return; taosArrayDestroyEx(sup->pWindowEvents, destroyStreamWindowEvent); taosArrayDestroyEx(sup->pWindowResults, destroyStreamWindowEvent); @@ -53,7 +53,7 @@ static void destroyStreamWindowEventSupp(SStreamNotifyEventSupp* sup) { *sup = (SStreamNotifyEventSupp){0}; } -static int32_t initStreamWindowEventSupp(SStreamNotifyEventSupp *sup) { +static int32_t initStreamNotifyEventSupp(SStreamNotifyEventSupp *sup) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock* pBlock = NULL; @@ -85,7 +85,7 @@ static int32_t initStreamWindowEventSupp(SStreamNotifyEventSupp *sup) { if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); if (sup) { - destroyStreamWindowEventSupp(sup); + destroyStreamNotifyEventSupp(sup); } } if (pBlock != NULL) { @@ -97,11 +97,11 @@ static int32_t initStreamWindowEventSupp(SStreamNotifyEventSupp *sup) { int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { pBasicInfo->primaryPkIndex = -1; pBasicInfo->updateOperatorInfo = false; - return initStreamWindowEventSupp(&pBasicInfo->windowEventSup); + return initStreamNotifyEventSupp(&pBasicInfo->windowEventSup); } void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { - destroyStreamWindowEventSupp(&pBasicInfo->windowEventSup); + destroyStreamNotifyEventSupp(&pBasicInfo->windowEventSup); } // Fast uint64_t to string conversion, equivalent to sprintf(buf, "%lu", val) but with 10x better performance. @@ -123,7 +123,7 @@ static char* u64toaFastLut(uint64_t val, char* buf) { strncpy(p, lut + val * 2, 2); p += 2; } else if (val > 0 || p == temp) { - *p++ = val + '0'; + *(p++) = val + '0'; } while (p != temp) { @@ -141,7 +141,7 @@ static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char *b // The windowId is generated by computing a hash value using the concatenation of group ID and window start time, in // the format: "_". p = u64toaFastLut(pSessionKey->groupId, p); - *p++ = '_'; + *(p++) = '_'; p = u64toaFastLut(pSessionKey->win.skey, p); hash = MurmurHash3_64(buf, p - buf); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4c493dfdffa4..bde1fd45fa53 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -323,6 +323,7 @@ void tFreeStreamTask(void* pParam) { pTask->chkInfo.pActiveInfo = NULL; taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL); + taosMemoryFreeClear(pTask->notifyInfo.streamName); taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); @@ -1323,6 +1324,7 @@ static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* in } TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes)); TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName)); _exit: if (code != TSDB_CODE_SUCCESS) { @@ -1354,6 +1356,10 @@ static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) { } TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes)); TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle)); + char *streamName = NULL; + TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &streamName)); + info->streamName = taosStrndup(streamName, TSDB_STREAM_FNAME_LEN + 1); + QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno); _exit: if (code != TSDB_CODE_SUCCESS) { @@ -1431,7 +1437,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); - TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo)); + if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) { + TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo)); + } tEndEncode(pEncoder); _exit: @@ -1531,7 +1539,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); - TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo)); + if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) { + TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo)); + } tEndDecode(pDecoder);