-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy pathquant5_2.dos
49 lines (44 loc) · 2.71 KB
/
quant5_2.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//重复运行时清理环境
def clearEnv(){
try{
dropAggregator(`tsAggr1)
dropAggregator(`tsAggr2)
unsubscribeTable(,`level2,`act_tsAggr1)
unsubscribeTable(,`level2,`act_tsAggr2)
unsubscribeTable(,`level2,`act_factor)
unsubscribeTable(,`level2,`newestLevel2data)
undef(`level2,SHARED)
undef(`OHLC1,SHARED)
undef(`OHLC2,SHARED)
undef(`FACTOR,SHARED)
}catch(ex){}
}
clearEnv()
//定义发布流表
share streamTable(100:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT]) as level2
//5.2 用流计算生成K线
modal = table(100:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT])
//五分钟K线
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC1
tsAggr1 = createTimeSeriesAggregator(name="tsAggr1", windowSize=300, step=300, metrics=<[first(last),max(last),min(last),last(last),sum(volume)]>, dummyTable=modal, outputTable=OHLC1, timeColumn=`datetime, keyColumn=`symbol)
subscribeTable(tableName="level2", actionName="act_tsAggr1", offset=0, handler=append!{tsAggr1}, msgAsTable=true);
//一分钟K线
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC2
tsAggr2 = createTimeSeriesAggregator(name="tsAggr2", windowSize=300, step=60, metrics=<[first(last),max(last),min(last),last(last),sum(volume)]>, dummyTable=modal, outputTable=OHLC2, timeColumn=`datetime, keyColumn=`symbol)
subscribeTable(tableName="level2", actionName="act_tsAggr2", offset=0, handler=append!{tsAggr2}, msgAsTable=true);
//回放数据到流表,触发K线计算
dbPath = "dfs://level2Replay"
if(!existsDatabase(dbPath)){
dbDate = database("", VALUE, 2020.01.01..2020.12.31)
dbSymbol=database("", HASH, [SYMBOL, 10])
db = database(dbPath, COMPO, [dbDate, dbSymbol])
modal = table(1:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT])
pt=db.createPartitionedTable(modal,`quotes, `datetime`symbol)
data = select symbol, datetime(datetime(date(date))+second(time)) as datetime, last, askPrice1, bidPrice1, askVolume1, bidVolume1, curVol as volume from loadTable("dfs://level2","quotes")
pt.append!(data)
}
quotes = loadTable(dbPath,"quotes")
//设置每次提取到内存数据量=1小时
repartitionSchema = time(cutPoints(08:00:00..18:00:00,10))
inputDS = replayDS(<select * from quotes>, `datetime, `datetime, repartitionSchema)
submitJob("replay_quotes", "replay_quotes_stream", replay, [inputDS], [`level2], `datetime, `datetime, 10, false, 2)