Skip to content

Commit

Permalink
#672 make sort process call initmissingtables and notifyidbs instead … (
Browse files Browse the repository at this point in the history
#681)

* #672 make sort process call initmissingtables and notifyidbs instead of wdb

* change initmissingtables to work with new set up

* #672 clean up cpde and add some comments

* #672 move idbreload to endofdaysort

* #672 change the way initmissingtables works

* #672 make use of wdbtypes for sort procs to connect to

* #672 remove unnecessary whitespace

* #672 make the initmissingtables command get sent to all wdbs

---------

Co-authored-by: [email protected] <[email protected]>
ThomasGillespie and [email protected] authored Nov 28, 2024
1 parent a4aff2e commit b882e10
Showing 4 changed files with 31 additions and 17 deletions.
40 changes: 26 additions & 14 deletions code/processes/wdb.q
Original file line number Diff line number Diff line change
@@ -54,6 +54,8 @@ tpcheckcycles:@[value;`tpcheckcycles;0W]; /-num
sorttypes:@[value;`sorttypes;`sort]; /-list of sort types to look for upon a sort
sortworkertypes:@[value;`sortworkertypes;`sortworker]; /-list of sort types to look for upon a sort being called with worker process

wdbtypes:@[value;`wdbtypes;`wdb]; /-list of wdb types for sort processes to look for on initmissingtables

subtabs:@[value;`subtabs;`]; /-list of tables to subscribe for
subsyms:@[value;`subsyms;`]; /-list of syms to subscription to
upd:@[value;`upd;{insert}]; /-value of the upd function
@@ -206,12 +208,6 @@ endofday:{[pt;processdata]
if[.finspace.enabled;.finspace.notifyhdb[;changeset] each .finspace.hdbclusters];
currentpartition::pt+1;
/- in case of default/partbyenum writedown mode we want to initialise the new partition with all the table schemas
/- then notify idb processes of the new db
if[writedownmode in `partbyenum`default;
.lg.o[`eod;"initialising wdbhdb for partition: ",string[currentpartition]];
initmissingtables[];
.lg.o[`eod;"notifying idbs for newly created partition"];
notifyidbs[`.idb.rollover;currentpartition]];
.lg.o[`eod;"end of day is now complete"];
if[.finspace.enabled;.os.hdeldir[getenv[`KDBSCRATCH];0b]];
};
@@ -424,6 +420,8 @@ endofdaysort:{[dir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod]
endofdaymerge[dir;pt;tablist;mergelimits;hdbsettings;mergemethod;writedownmode];
endofdaysortdate[dir;pt;key tablist;hdbsettings]
];
/- run steps to rollover idb
idbreload[currentpartition+1];
/- reset compression level (.z.zd)
resetcompression[16 0 0]
};
@@ -527,20 +525,21 @@ fixpartition:{[subto]

/- for writedown modes partbyenum/default we make sure that partition 0/currentpartition has all the tables.
/- In that case we can use .Q.chk later to fill the db making it useable for intraday processes
initmissingtables:{[]
.lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string currentpartition];
inittable each tablelist[];
filldb[];
/- pt - date; partition for which the function should initialise
initmissingtables:{[pt]
.lg.o[`fixpartition;"Adding missing tables(empty) to partition ",string pt];
inittable[;pt] each tablelist[];
filldb[pt];
}

filldb:{[]
filldb:{[pt]
/- for all enumerated partitions we want to make sure that all tables are present
.Q.chk[.Q.par[hsym savedir; currentpartition; `]];
.Q.chk[.Q.par[hsym savedir; pt; `]];
}

/- initialises table t in db with its schema in part
inittable:{[t]
tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;currentpartition];0;t]; .Q.par[hsym savedir;currentpartition;t]],`;
inittable:{[t;pt]
tabledir:` sv $[writedownmode~`partbyenum; .Q.par[.Q.dd[hsym savedir;pt];0;t]; .Q.par[hsym savedir;pt;t]],`;
if[() ~ key tabledir;tabledir set .Q.en[hsym hdbdir;0#value t]];
}

@@ -594,6 +593,19 @@ getsortparams:{[]
];
};

/- Function to trigger idb reload steps, The initmissingtables function needs to be ran on wdb process, however this function can be ran on the sort processes
/- If the function is ran on sort process send initmissingtables command to wdbs
idbreload:{[pt]
.lg.o[`idb;"starting idb reload"];
if[writedownmode in `partbyenum`default;
.lg.o[`eod;"initialising wdbhdb for partition: ",string[pt]];
$[.proc.proctype~`sort;{[pt]ws:exec w from .servers.getservers[`proctype;wdbtypes;()!();1b;0b];{[ws;pt]ws(`.wdb.initmissingtables;[pt])}[;pt] each ws}[pt];initmissingtables[pt]];
.lg.o[`eod;"notifying idbs for newly created partition"];
notifyidbs[`.idb.rollover;pt]
];
.lg.o[`idb;"idb reload complete"];
};;

\d .

/- get the sort attributes for each table
4 changes: 2 additions & 2 deletions code/wdb/origstartup.q
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ startup:{[]
];
subscribe[];
/- add missing tables to partitions in case an IDB process wants to connect. Only applicable for partbyenum writedown mode
if[.wdb.writedownmode in `default`partbyenum;initmissingtables[]];
if[.wdb.writedownmode in `default`partbyenum;initmissingtables[currentpartition]];
];
@[`.; `upd; :; .wdb.upd];
}
}
3 changes: 2 additions & 1 deletion config/settings/sort.q
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ ignorelist:`heartbeat`logmsg // list of tables to ignore
hdbtypes:`hdb // list of hdb types to look for and call in hdb reload
rdbtypes:`rdb // list of rdb types to look for and call in rdb reload
tickerplanttypes:`tickerplant // list of tickerplant types to try and make a connection to
wdbtypes:`wdb // list of wdb types to look for and call in wdb init tables
subtabs:` // list of tables to subscribe for (` for all)
subsyms:` // list of syms to subscribe for (` for all)
savedir:hsym`$getenv[`TORQHOME],"/wdbhdb" // location to save wdb data
@@ -46,5 +47,5 @@ eodwaittime:0D00:00:10.000 // time to wait for async calls to compl

// Server connection details
\d .servers
CONNECTIONS:`hdb`tickerplant`rdb`gateway // list of connections to make at start up
CONNECTIONS:`wdb`hdb`tickerplant`rdb`gateway // list of connections to make at start up
STARTUP:1b // create connections
1 change: 1 addition & 0 deletions config/settings/wdb.q
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ ignorelist:`heartbeat`logmsg
hdbtypes:`hdb // list of hdb types to look for and call in hdb reload
rdbtypes:`rdb // list of rdb types to look for and call in rdb reload
idbtypes:`idb // list of idb types to look for and call in rdb reload
wdbtypes:() // wdb does not need to connect to itself
gatewaytypes:`gateway // list of gateway types to inform at reload
tickerplanttypes:`segmentedtickerplant // list of tickerplant types to try and make a connection to
subtabs:` // list of tables to subscribe for (` for all)

0 comments on commit b882e10

Please sign in to comment.