Skip to content

Commit

Permalink
feat(stream)[TS-5469]. make agg task generate notify event
Browse files Browse the repository at this point in the history
  • Loading branch information
JinqingKuang committed Jan 9, 2025
1 parent ff86a30 commit 575218e
Show file tree
Hide file tree
Showing 15 changed files with 702 additions and 74 deletions.
5 changes: 4 additions & 1 deletion include/common/tcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
STREAM_EVENT_OPEN_WINDOW,
STREAM_NOTIFY_EVENT,
} EStreamType;

#pragma pack(push, 1)
Expand Down Expand Up @@ -405,6 +405,9 @@ typedef struct STUidTagInfo {
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2

// stream notify event block column
#define NOTIFY_EVENT_STR_COLUMN_INDEX 0

int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);

Expand Down
2 changes: 2 additions & 0 deletions include/common/tdatablock.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ bool isAutoTableName(char* ctbName);
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap);
int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName);

int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);

Expand Down
3 changes: 2 additions & 1 deletion include/libs/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);

int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);

void qSetStreamEventTypes(qTaskInfo_t tinfo, int32_t eventTypes);
int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
const char* stbFullName, bool newSubTableRule);

/**
* Set multiple input data blocks for the stream scan.
Expand Down
2 changes: 2 additions & 0 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId
#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close

#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))

extern int32_t streamMetaRefPool;
extern int32_t streamTaskRefPool;

Expand Down
27 changes: 27 additions & 0 deletions source/common/src/tdatablock.c
Original file line number Diff line number Diff line change
Expand Up @@ -3061,6 +3061,33 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return code;
}

int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

if (parTbName[0]) {
if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
stbFullName) {
*dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);

tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_CODE(code, lino, _end);
} else {
*dstTableName = taosStrdup(parTbName);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
}
} else {
code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
TSDB_CHECK_CODE(code, lino, _end);
}

_end:
return code;
}

// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);
Expand Down
8 changes: 6 additions & 2 deletions source/dnode/vnode/src/tq/tqSink.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#include "tcommon.h"
#include "tq.h"

#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))

typedef struct STableSinkInfo {
uint64_t uid;
tstr name;
Expand Down Expand Up @@ -1175,6 +1173,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
} else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
continue;
} else {
code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
}
Expand Down Expand Up @@ -1319,6 +1319,10 @@ void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVno
continue;
}

if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
continue;
}

hasSubmit = true;
pTask->execInfo.sink.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId;
Expand Down
10 changes: 9 additions & 1 deletion source/dnode/vnode/src/tqCommon/tqCommon.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
return code;
}

qSetStreamEventTypes(&pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes);
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
code = qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
pTask->outputInfo.tbSink.pSchemaWrapper, pTask->outputInfo.tbSink.stbFullName,
IS_NEW_SUBTB_RULE(pTask));
if (code) {
tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
}
}

streamSetupScheduleTrigger(pTask);
Expand Down
15 changes: 11 additions & 4 deletions source/libs/executor/inc/executorInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,16 @@ typedef struct STimeWindowAggSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;

typedef struct SStreamNotifyEventSupp {
SArray* pWindowEvents; // Array of SStreamNotifyEvent, storing window events and trigger values.
SArray* pWindowResults; // Array of SStreamNotifyEvent, storing window results.
SSDataBlock* pEventBlock; // The datablock contains all window events and results.
} SStreamNotifyEventSupp;

typedef struct SSteamOpBasicInfo {
int32_t primaryPkIndex;
bool updateOperatorInfo;
SSDataBlock* pEventRes;
SArray* pEventInfo;
int32_t primaryPkIndex;
bool updateOperatorInfo;
SStreamNotifyEventSupp windowEventSup;
} SSteamOpBasicInfo;

typedef struct SStreamFillSupporter {
Expand Down Expand Up @@ -769,6 +774,8 @@ typedef struct SStreamEventAggOperatorInfo {
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
SNodeList* pStartCondCols;
SNodeList* pEndCondCols;
} SStreamEventAggOperatorInfo;

typedef struct SStreamCountAggOperatorInfo {
Expand Down
5 changes: 4 additions & 1 deletion source/libs/executor/inc/querytask.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ typedef struct {
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
SStreamState* pState;
int32_t eventTypes;
int32_t eventTypes; // event types to notify
SSchemaWrapper* notifyResultSchema; // agg result to notify
char* stbFullName; // used to generate dest child table name
bool newSubTableRule; // used to generate dest child table name
} SStreamTaskInfo;

struct SExecTaskInfo {
Expand Down
10 changes: 10 additions & 0 deletions source/libs/executor/inc/streamexecutorInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
extern "C" {
#endif

#include "cJSON.h"
#include "cmdnodes.h"
#include "executorInt.h"
#include "querytask.h"
#include "tutil.h"

#define FILL_POS_INVALID 0
Expand Down Expand Up @@ -107,6 +110,13 @@ int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
TSKEY compareTs(void* pKey);

int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
SStreamNotifyEventSupp* sup);
int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper,
SStreamNotifyEventSupp* sup);
int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup);

#ifdef __cplusplus
}
#endif
Expand Down
23 changes: 19 additions & 4 deletions source/libs/executor/src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,26 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code;
}

void qSetStreamEventTypes(qTaskInfo_t tinfo, int32_t eventTypes) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
if (pTaskInfo != NULL) {
pTaskInfo->streamInfo.eventTypes = eventTypes;
int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
const char* stbFullName, bool newSubTableRule) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamTaskInfo *pStreamInfo = NULL;

if (tinfo == 0 || eventTypes == 0 || pSchemaWrapper == NULL || stbFullName == NULL) {
goto _end;
}

pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo;
pStreamInfo->eventTypes = eventTypes;
pStreamInfo->notifyResultSchema = tCloneSSchemaWrapper(pSchemaWrapper);
if (pStreamInfo->notifyResultSchema == NULL) {
code = terrno;
}
pStreamInfo->stbFullName = taosStrdup(stbFullName);
pStreamInfo->newSubTableRule = newSubTableRule;

_end:
return code;
}

int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
Expand Down
2 changes: 2 additions & 0 deletions source/libs/executor/src/querytask.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSchemaWrapper(pStreamInfo->schema);
tOffsetDestroy(&pStreamInfo->currentOffset);
tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
taosMemoryFree(pStreamInfo->stbFullName);
}

static void freeBlock(void* pParam) {
Expand Down
84 changes: 44 additions & 40 deletions source/libs/executor/src/streameventwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ void destroyStreamEventOperatorInfo(void* param) {
pInfo->pEndCondInfo = NULL;
}

if (pInfo->pStartCondCols != NULL) {
nodesDestroyList(pInfo->pStartCondCols);
pInfo->pStartCondCols = NULL;
}

if (pInfo->pEndCondCols != NULL) {
nodesDestroyList(pInfo->pEndCondCols);
pInfo->pEndCondCols = NULL;
}

taosMemoryFreeClear(param);
}

Expand Down Expand Up @@ -310,14 +320,6 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
}

static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) {
void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey);
if (pRes != NULL) {
return TSDB_CODE_SUCCESS;
}
return terrno;
}

static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
Expand Down Expand Up @@ -393,8 +395,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&nextWinKey, &winCode);
QUERY_CHECK_CODE(code, lino, _end);

if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) {
code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin);
if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) &&
*(bool*)colDataGetNumData(pColStart, i) && winCode != TSDB_CODE_SUCCESS) {
code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &curWin.winInfo.sessionWin, pSDataBlock,
pInfo->pStartCondCols, i, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}

Expand Down Expand Up @@ -464,6 +468,12 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}

if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_CLOSE, &curWin.winInfo.sessionWin, pSDataBlock,
pInfo->pEndCondCols, i + winRows - 1, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}
}

_end:
Expand Down Expand Up @@ -582,42 +592,13 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
}
}

static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

blockDataCleanup(pBasicInfo->pEventRes);
int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo);
code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < size; i++) {
SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i);
uint64_t uid = 0;
code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
}
taosArrayClear(pBasicInfo->pEventInfo);

_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
}
}


static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

buildEventNotifyResult(&pInfo->basic);
if (pInfo->basic.pEventRes->info.rows > 0) {
printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->basic.pEventRes;
return code;
}

doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
Expand All @@ -628,10 +609,27 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
code = addAggResultNotifyEvent(pBInfo->pRes, pTaskInfo->streamInfo.notifyResultSchema, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}
(*ppRes) = pBInfo->pRes;
return code;
}

code = buildNotifyEventBlock(pTaskInfo, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->basic.windowEventSup.pEventBlock->info.rows > 0) {
printDataBlock(pInfo->basic.windowEventSup.pEventBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->basic.windowEventSup.pEventBlock;
return code;
}

_end:
(*ppRes) = NULL;
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}

Expand Down Expand Up @@ -1041,6 +1039,12 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);

code =
nodesCollectColumnsFromNode((SNode*)pEventNode->pStartCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pStartCondCols);
QUERY_CHECK_CODE(code, lino, _error);
code = nodesCollectColumnsFromNode((SNode*)pEventNode->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pEndCondCols);
QUERY_CHECK_CODE(code, lino, _error);

*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;

Expand Down
Loading

0 comments on commit 575218e

Please sign in to comment.