Skip to content

Commit

Permalink
feat(stream)[TS-5469]. make sink task send notify event
Browse files Browse the repository at this point in the history
  • Loading branch information
JinqingKuang committed Jan 12, 2025
1 parent 575218e commit e484291
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 9 deletions.
1 change: 1 addition & 0 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ typedef struct SNotifyInfo {
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
char* streamName;
} SNotifyInfo;

struct SStreamTask {
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mnode/impl/src/mndStream.c
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, SStr
TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
pTask->notifyInfo.streamName = taosStrdup(createReq->name);
TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);

_end:
if (code != TSDB_CODE_SUCCESS) {
Expand Down
1 change: 1 addition & 0 deletions source/dnode/vnode/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ set(
"src/tq/tqSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"
"src/tq/tqStreamNotify.c"
)

aux_source_directory("src/tsdb/" TSDB_SOURCE_FILES)
Expand Down
5 changes: 5 additions & 0 deletions source/dnode/vnode/src/inc/tq.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type);

// tq send notifications
int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode);

#define TQ_ERR_GO_TO_END(c) \
do { \
code = c; \
Expand Down
5 changes: 5 additions & 0 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;

typedef struct SStreamNotifyHandleMap SStreamNotifyHandleMap;

#define VNODE_META_TMP_DIR "meta.tmp"
#define VNODE_META_BACKUP_DIR "meta.backup"

Expand Down Expand Up @@ -496,6 +498,9 @@ struct SVnode {
int64_t blockSeq;
SQHandle* pQuery;
SVMonitorObj monitor;

// Notification Handles
SStreamNotifyHandleMap* pNotifyHandleMap;
};

#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
Expand Down
6 changes: 6 additions & 0 deletions source/dnode/vnode/src/tq/tqSink.c
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return;
}

code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
// continue processing even if notification fails
}

bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
Expand Down
Loading

0 comments on commit e484291

Please sign in to comment.