-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy path01.dataProcess.txt
73 lines (69 loc) · 3.62 KB
/
01.dataProcess.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
dataProcess.txt
Script to process data
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09
Last modification time: 2022.05.10
*/
/**
Attention:
1. The developer need to import level2 snapshot data into the database in advance
2. There are two places in the script that need to be modified according to the environment
*/
//login account and clean up the environment
login("admin", "123456")
clearAllCache()
undef(all)
go
/**
part1: create database and table to store the processed results
modified location 1: storeDBName and storeTBName
*/
storeDBName = "dfs://sz50VolatilityDataSet"
storeTBName = "sz50VolatilityDataSet"
if(existsDatabase(storeDBName)){
dropDatabase(storeDBName)
}
db = database(storeDBName, RANGE, sort(distinct(yearBegin(2000.01.01..2040.01.01))))
name = `SecurityID`TradeTime`BAS`DI0`DI1`DI2`DI3`DI4`DI5`DI6`DI7`DI8`DI9`Press`RV`targetRV
type = `SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
tbSchema = table(1:0, name, type)
db.createPartitionedTable(table=tbSchema, tableName=storeTBName, partitionColumns=`TradeTime, compressMethods={TradeTime:"delta"})
go
/**
part2: process level2 snapshot data
modified location 2: dbName and tbName
*/
dbName = "dfs://snapshot_SH_L2_OLAP"
tableName = "snapshot_SH_L2_OLAP"
snapshot = loadTable(dbName, tableName)
stockList=`601318`600519`600036`600276`601166`600030`600887`600016`601328`601288`600000`600585`601398`600031`601668`600048`601888`600837`601601`601012`603259`601688`600309`601988`601211`600009`600104`600690`601818`600703`600028`601088`600050`601628`601857`601186`600547`601989`601336`600196`603993`601138`601066`601236`601319`603160`600588`601816`601658`600745
//define function to process data with matrix operation
defg featureEngine(bidPrice,bidQty,offerPrice,offerQty){
bas = offerPrice[0]\bidPrice[0]-1
wap = (bidPrice[0]*offerQty[0] + offerPrice[0]*bidQty[0])\(bidQty[0]+offerQty[0])
di = (bidQty-offerQty)\(bidQty+offerQty)
bidw=(1.0\(bidPrice-wap))
bidw=bidw\(bidw.rowSum())
offerw=(1.0\(offerPrice-wap))
offerw=offerw\(offerw.rowSum())
press=log((bidQty*bidw).rowSum())-log((offerQty*offerw).rowSum())
rv=std(log(wap)-log(prev(wap)))*sqrt(24*252*size(wap))
return avg(bas),avg(di[0]),avg(di[1]),avg(di[2]),avg(di[3]),avg(di[4]),avg(di[5]),avg(di[6]),avg(di[7]),avg(di[8]),avg(di[9]),avg(press),rv
}
//SQL for level2 snapshot data processing
result = select
featureEngine(
matrix(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9),
matrix(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9),
matrix(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9),
matrix(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9)) as `BAS`DI0`DI1`DI2`DI3`DI4`DI5`DI6`DI7`DI8`DI9`Press`RV
from snapshot
where date(TradeTime) between 2020.01.01 : 2020.12.31, SecurityID in stockList, (time(TradeTime) between 09:30:00.000 : 11:29:59.999) || (time(TradeTime) between 13:00:00.000 : 14:56:59.999)
group by SecurityID, interval( TradeTime, 10m, "none" ) as TradeTime map
result = select *, next(RV) as targetRV from result context by date(TradeTime), SecurityID
result = result[each(isValid, result.values()).rowAnd()]
result = select * from (select *, count(*) from result context by date(TradeTime), SecurityID) where count=23
dropColumns!(result, `count)
//store the processing results
loadTable(storeDBName, storeTBName).append!(result)