diff --git a/code/processes/wdb.q b/code/processes/wdb.q index 60d5baf85..5cfb69c3c 100644 --- a/code/processes/wdb.q +++ b/code/processes/wdb.q @@ -54,6 +54,8 @@ tpcheckcycles:@[value;`tpcheckcycles;0W]; /-num sorttypes:@[value;`sorttypes;`sort]; /-list of sort types to look for upon a sort sortworkertypes:@[value;`sortworkertypes;`sortworker]; /-list of sort types to look for upon a sort being called with worker process +wdbtypes:@[value;`wdbtypes;`wdb]; /-list of wdb types for sort processes to look for on initmissingtables + subtabs:@[value;`subtabs;`]; /-list of tables to subscribe for subsyms:@[value;`subsyms;`]; /-list of syms to subscription to upd:@[value;`upd;{insert}]; /-value of the upd function @@ -206,12 +208,6 @@ endofday:{[pt;processdata] if[.finspace.enabled;.finspace.notifyhdb[;changeset] each .finspace.hdbclusters]; currentpartition::pt+1; /- in case of default/partbyenum writedown mode we want to initialise the new partition with all the table schemas - /- then notify idb processes of the new db - if[writedownmode in `partbyenum`default; - .lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]]; - initmissingtables[]; - .lg.o[`eod;"notifying idbs for newly created partition"]; - notifyidbs[`.idb.rollover;currentpartition]]; .lg.o[`eod;"end of day is now complete"]; if[.finspace.enabled;.os.hdeldir[getenv[`KDBSCRATCH];0b]]; }; @@ -424,6 +420,8 @@ endofdaysort:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod] endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode]; endofdaysortdate[dir;pt;key tablist;hdbsettings] ]; + /- run steps to rollover idb + idbreload[currentpartition+1]; /- reset compression level (.z.zd) resetcompression[16 0 0] }; @@ -527,20 +525,21 @@ fixpartition:{[subto] /- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables. /- In that case we can use .Q.chk later to fill the db making it useable for intraday processes -initmissingtables:{[] - .lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string currentpartition]; - inittable each tablelist[]; - filldb[]; +/- pt - date; partition for which the function should initialise +initmissingtables:{[pt] + .lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string pt]; + inittable[;pt] each tablelist[]; + filldb[pt]; } -filldb:{[] +filldb:{[pt] /- for all enumerated partitions we want to make sure that all tables are present - .Q.chk[.Q.par[hsym savedir; currentpartition; `]]; + .Q.chk[.Q.par[hsym savedir; pt; `]]; } /- initialises table t in db with its schema in part -inittable:{[t] - tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;currentpartition];0;t]; .Q.par[hsym savedir;currentpartition;t]],`; +inittable:{[t;pt] + tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`; if[() ~ key tabledir;tabledir set .Q.en[hsym hdbdir;0#value t]]; } @@ -594,6 +593,19 @@ getsortparams:{[] ]; }; +/- Function to trigger idb reload steps, The initmissingtables function needs to be ran on wdb process, however this function can be ran on the sort processes +/- If the function is ran on sort process send initmissingtables command to wdbs +idbreload:{[pt] + .lg.o[`idb;"starting idb reload"]; + if[writedownmode in `partbyenum`default; + .lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]]; + $[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;wdbtypes;()!();1b;0b];{[ws;pt]ws(`.wdb.initmissingtables;[pt])}[;pt] each ws}[pt];initmissingtables[pt]]; + .lg.o[`eod;"notifying idbs for newly created partition"]; + notifyidbs[`.idb.rollover;pt] + ]; + .lg.o[`idb;"idb reload complete"]; + };; + \d . /- get the sort attributes for each table diff --git a/code/wdb/origstartup.q b/code/wdb/origstartup.q index db884fa29..dd91c0e61 100644 --- a/code/wdb/origstartup.q +++ b/code/wdb/origstartup.q @@ -15,7 +15,7 @@ startup:{[] ]; subscribe[]; /- add missing tables to partitions in case an IDB process wants to connect. Only applicable for partbyenum writedown mode - if[.wdb.writedownmode in `default`partbyenum;initmissingtables[]]; + if[.wdb.writedownmode in `default`partbyenum;initmissingtables[currentpartition]]; ]; @[`.; `upd; :; .wdb.upd]; - } \ No newline at end of file + } diff --git a/config/settings/sort.q b/config/settings/sort.q index 8f3233700..228b3bb8a 100644 --- a/config/settings/sort.q +++ b/config/settings/sort.q @@ -5,6 +5,7 @@ ignorelist:`heartbeat`logmsg // list of tables to ignore hdbtypes:`hdb // list of hdb types to look for and call in hdb reload rdbtypes:`rdb // list of rdb types to look for and call in rdb reload tickerplanttypes:`tickerplant // list of tickerplant types to try and make a connection to +wdbtypes:`wdb // list of wdb types to look for and call in wdb init tables subtabs:` // list of tables to subscribe for (` for all) subsyms:` // list of syms to subscribe for (` for all) savedir:hsym`$getenv[`TORQHOME],"/wdbhdb" // location to save wdb data @@ -46,5 +47,5 @@ eodwaittime:0D00:00:10.000 // time to wait for async calls to compl // Server connection details \d .servers -CONNECTIONS:`hdb`tickerplant`rdb`gateway // list of connections to make at start up +CONNECTIONS:`wdb`hdb`tickerplant`rdb`gateway // list of connections to make at start up STARTUP:1b // create connections diff --git a/config/settings/wdb.q b/config/settings/wdb.q index 16250689d..95c8047e3 100644 --- a/config/settings/wdb.q +++ b/config/settings/wdb.q @@ -6,6 +6,7 @@ ignorelist:`heartbeat`logmsg hdbtypes:`hdb // list of hdb types to look for and call in hdb reload rdbtypes:`rdb // list of rdb types to look for and call in rdb reload idbtypes:`idb // list of idb types to look for and call in rdb reload +wdbtypes:() // wdb does not need to connect to itself gatewaytypes:`gateway // list of gateway types to inform at reload tickerplanttypes:`segmentedtickerplant // list of tickerplant types to try and make a connection to subtabs:` // list of tables to subscribe for (` for all)