-
Notifications
You must be signed in to change notification settings - Fork 34
/
StreamEngine.h
206 lines (175 loc) · 8.28 KB
/
StreamEngine.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
//
// Created by jccai on 19-5-31.
//
#ifndef DOLPHINDB_STREAMENGINE_H
#define DOLPHINDB_STREAMENGINE_H
#include "Util.h"
#include "ScalarImp.h"
#include "CoreConcept.h"
#include "SmartPointer.h"
#include "SysIO.h"
class AbstractStreamEngine;
typedef SmartPointer<AbstractStreamEngine> AbstractStreamEngineSP;
typedef AbstractStreamEngineSP(*StreamEngineFactory)(Heap* heap, const DataInputStreamSP& in);
class StreamEngineManager {
public:
StreamEngineManager(const StreamEngineManager &) = delete;
StreamEngineManager operator=(const StreamEngineManager &) = delete;
static StreamEngineManager& instance();
void insert(const AbstractStreamEngineSP &aggregator);
void remove(const string &name);
AbstractStreamEngineSP find(const string &name);
ConstantSP getStat();
ConstantSP getBasicStat();
void registerEngineFactory(const string& engineType, StreamEngineFactory factory);
StreamEngineFactory getEngineFactory(const string& engineType) const;
AbstractStreamEngineSP createStreamEngine(Heap* heap, const DataInputStreamSP& in) const;
private:
StreamEngineManager() = default;
mutable Mutex mutex_;
unordered_map<string, AbstractStreamEngineSP> engines_;
unordered_set<string> engineTypes_;
unordered_map<string, StreamEngineFactory> engineFactories_;
};
class AbstractStreamEngine : public Table {
public:
AbstractStreamEngine(const string &type, const string &name, const string &user, const TableSP& dummy, const string& snapshotDir = "", long long snapshotIntervalInMsgCount = LLONG_MAX, int raftGroup = -1, const string &uuid = "");
~AbstractStreamEngine() override;
/**
* (optional) implement addMetrics if your engine support it
* arguments[0]: meta code, new metrics
* arguments[1]: table, indicating the scheme of the new metrics
*/
virtual bool addMetrics(Heap *heap, vector<ConstantSP> &arguments) { return false; }
/**
* (optional) called when removed from StreamEngineManager
* should be non-blocking, e.g. set some flags and return immediately
*/
virtual void finalize() {}
/**
* (optional) use historical data (state) to warm up the stream engine.
* Warmup is same as formal messages except that warmup doesn't produce
* any message.
*/
virtual bool warmup(vector<ConstantSP>& values, string& errMsg){
errMsg = "warmupStreamEngine currently only supports the reactive state engine and time series engine. RefId:S03018";
return false;
}
/**
* (required) save the current state of the stream engine
*/
virtual IO_ERR snapshotState(const DataOutputStreamSP& out) = 0;
/**
* (required) restore the state of the stream engine
*/
virtual IO_ERR restoreState(const DataInputStreamSP& in) = 0;
virtual IO_ERR serialize(BufferSP& buffer) const {
throw RuntimeException("AbstractStreamEngine::serialize not implemented yet.");
}
/**
* (required) the scheme of the generated table must be consistent with the engineStat_ vector
* generateEngineStatTable will be called asynchronously by StreamEngineManager, should be thread safe
*/
virtual TableSP generateEngineStatTable() = 0;
/**
* (required) initialize the engineStat_ which is used as a buffer
* must be called immediately and only once after the engine's construction
*/
virtual void initEngineStat() = 0;
/**
* (required) engine statues are represented by a Table
* updateEngineStat will be called asynchronously by StreamEngineManager, should be thread safe
* updateEngineStat should only update the changed fields in the engineStat_ to reduce overhead
*/
virtual void updateEngineStat() = 0;
virtual void appendMsg(const ConstantSP& body, long long msgId);
virtual void restoreEngineState();
string getEngineType() const;
string getEngineName() const;
string getEngineCreator() const;
int getRaftGroup() const { return raftGroup_;}
vector<ConstantSP> &getEngineStatRef();
bool sizeable() const override {
return false;
}
bool update(vector<ConstantSP>& values, const ConstantSP& indexSP, vector<string>& colNames, string& errMsg) override {
errMsg = "StreamEngine doesn't support data update.";
return false;
}
bool remove(const ConstantSP& indexSP, string& errMsg) override {
errMsg = "StreamEngine doesn't support data deletion.";
return false;
}
ConstantSP getColumn(INDEX index) const override {return dummy_->getColumn(index);}
const string &getColumnName(int index) const override { return colNames_->at(index); }
DATA_TYPE getColumnType(int index) const override { return dummy_->getColumnType(index); }
int getColumnExtraParam(int index) const override { return dummy_->getColumnExtraParam(index);}
INDEX columns() const override { return colNames_->size(); }
INDEX size() const override { return 0; }
const string &getColumnQualifier(int index) const override { return name_; }
TABLE_TYPE getTableType() const override { return STREAMENGINE; }
long long int getAllocatedMemory() const override { return 0; }
ConstantSP get(INDEX col, INDEX row) const override { throw RuntimeException("get() not supported"); }
ConstantSP getColumn(const string &name) const override;
ConstantSP getColumn(const string &qualifier, const string &name) const override;
ConstantSP getColumn(const string &name, const ConstantSP &rowFilter) const override;
ConstantSP getColumn(const string& qualifier, const string& name, const ConstantSP& rowFilter) const override;
ConstantSP getColumn(INDEX index, const ConstantSP& rowFilter) const override;
void setColumnName(int index, const string& name) override;
int getColumnIndex(const string& name) const override;
bool contain(const string& name) const override;
bool contain(const string& qualifier, const string& name) const override;
bool contain(const ColumnRef* col) const override;
bool contain(const ColumnRefSP& col) const override;
bool containAll(const vector<ColumnRefSP>& cols) const override;
ConstantSP getColumnLabel() const override;
ConstantSP keys() const override;
ConstantSP values() const override;
string getString(INDEX index) const override;
string getString() const override;
bool set(INDEX index, const ConstantSP& value) override;
void setName(const string &name);
ConstantSP get(INDEX index) const override;
ConstantSP get(const ConstantSP &index) const override;
const string &getName() const override;
ConstantSP getInstance() const override;
ConstantSP getInstance(INDEX size) const override;
ConstantSP getValue() const override;
ConstantSP getValue(Heap *heap) override;
ConstantSP getValue(INDEX capacity) const override;
ConstantSP getReference(Heap *heap) override;
ConstantSP getWindow(INDEX colStart, int colLength, INDEX rowStart, int rowLength) const override;
ConstantSP getMember(const ConstantSP &key) const override;
long long getSnapshotMessageId() const { return snapshotMessageId_;}
virtual bool readPermitted(const AuthenticatedUserSP& user) const;
virtual bool writePermitted(const AuthenticatedUserSP& user) const;
virtual bool isJoinEngine() {return false;}
protected:
const string engineType_;
const string engineName_;
const string engineUser_;
/// Engine should update these fields. Basically a streaming engine should be confined to one thread
/// thus AbstractStreamEngine::append should be guarded by a mutex
/// these fields are recommended to be guarded by the same mutex
string status_ = "OK";
string lastErrMsg_ = "";
uint64_t cumMessages_ = 0;
uint64_t cumMessagesSnapshot_ = 0;
uint64_t snapshotThreshold_ = LLONG_MAX;
long long snapshotMessageId_ = -1;
long long snapshotTimestamp_ = LLONG_MIN;
string snapshotDir_;
int raftGroup_;
vector<ConstantSP> engineStat_;
string name_;
std::string uuid_;
ConstantSP getInternal(INDEX index) const;
ConstantSP getInternal(const ConstantSP& index) const;
ConstantSP getMemberInternal(const ConstantSP& key) const;
private:
void initialize();
SmartPointer<vector<string>> colNames_;
SmartPointer<unordered_map<string, int>> colMap_;
TableSP dummy_;
};
#endif //DOLPHINDB_STREAMENGINE_H