-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy pathquant5_3.dos
73 lines (69 loc) · 3.33 KB
/
quant5_3.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//重复运行时清理环境
def clearEnv(){
try{
dropAggregator(`tsAggr1)
dropAggregator(`tsAggr2)
unsubscribeTable(,`level2,`act_tsAggr1)
unsubscribeTable(,`level2,`act_tsAggr2)
unsubscribeTable(,`level2,`act_factor)
unsubscribeTable(,`level2,`newestLevel2data)
undef(`level2,SHARED)
undef(`OHLC1,SHARED)
undef(`OHLC2,SHARED)
undef(`FACTOR,SHARED)
}catch(ex){}
}
clearEnv()
//定义发布流表
share streamTable(100:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT]) as level2
//5.3 用流计算生成实时高频因子
//定义函数calcNetAmountRatio,对一个向量求前n个与前2n个元素之和的比值:
defg calcNetAmountRatio(x,n){
size = x.size()
if(size>=2*n){
return x.subarray((size - n):size).sum()\x.subarray((size - 2*n):size).sum()
}else{return 0}
}
//因子计算函数
def factorHandler(mutable factorTable,mutable d, facName,msg){
codeList = msg.symbol.distinct()
symbolCount = codeList.size()
//资金净流入(net_amount)= volume * iif(bidPrice1>=askPrice1, 1, -1)
t2 = select symbol, volume * iif(bidPrice1>=askPrice1, 1, -1) as net_amount from msg
//将本次数据的计算net_amount追加更新字典
dictUpdate!(d,append!, t2.symbol, t2.net_amount)
//计算因子
factorValue = array(DOUBLE,symbolCount)
for(i in 0:symbolCount){
factorValue[i] = calcNetAmountRatio(d[codeList[i]],100)
}
//添加时间戳,写入因子结果表
factorTable.append!(table(take(now(),symbolCount) as timestamp, codeList as symbol,factorValue as value, take(facName,symbolCount) as factorName))
}
print("创建因子结果表")
share(streamTable(100:0, `timestamp`symbol`value`factorName,[TIMESTAMP,SYMBOL,DOUBLE,SYMBOL]),"FACTOR")
go
print("正在创建历史数据字典")
d = dict(STRING, ANY)
his = select symbol,volume * iif(bidPrice1>=askPrice1, 1, -1) as net_amount from loadTable("dfs://level2","quotes") context by symbol limit -200
for(id in his[`symbol].distinct())
d[id]= exec net_amount from his where symbol == id
print("创建历史数据字典完成")
subscribeTable(tableName="level2", actionName="act_factor", offset=0, handler=factorHandler{FACTOR,d,"factor1"}, msgAsTable=true, batchSize=4000, throttle=1)
print("创建流数据订阅关系完成")
//回放数据到流表,触发因子计算
dbPath = "dfs://level2Replay"
if(!existsDatabase(dbPath)){
dbDate = database("", VALUE, 2020.01.01..2020.12.31)
dbSymbol=database("", HASH, [SYMBOL, 10])
db = database(dbPath, COMPO, [dbDate, dbSymbol])
modal = table(1:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT])
pt=db.createPartitionedTable(modal,`quotes, `datetime`symbol)
data = select symbol, datetime(datetime(date(date))+second(time)) as datetime, last, askPrice1, bidPrice1, askVolume1, bidVolume1, curVol as volume from loadTable("dfs://level2","quotes")
pt.append!(data)
}
quotes = loadTable(dbPath,"quotes")
//设置每次提取到内存数据量=1小时
repartitionSchema = time(cutPoints(08:00:00..18:00:00,10))
inputDS = replayDS(<select * from quotes>, `datetime, `datetime, repartitionSchema)
submitJob("replay_quotes", "replay_quotes_stream", replay, [inputDS], [`level2], `datetime, `datetime, 10, false, 2)