Skip to content

Commit

Permalink
new writedown mode partbyenum and new intraday process (#656)
Browse files Browse the repository at this point in the history
* added partbyenum writedown mode

* new idb process

* added idb process

* changing EOL to unix

* changed idb logic

* idb notification logging refined

* createalias refined and added to os.q

* addressed comments, removed gmttime, refactored

* correcting indentation, removing reload function from idbstandard.q

* removing unused server types from wdb

* added back comment in processes/wdb.q

* added hsym to folder parameters

* added testing wdb partbyenum

* added test for idb process, added hsym to hdbdir and wdbdir parmters for idb

* added documentation for partbyenum and idb

* added graphics for idb docs

* EOL changed to LF

* IDB is accessible via Gateway

* IDB doesn't fail on restart or empty database

* IDB reworked - connects to WDB and registers itself with WDB. IDB has no significant downtime now. WDB now initialises the DB after EOD rollover.

* WDB refactored to support new writedown mode partbyenum more

* IDB intraday reload and logging improved. Fixed some comments.
  • Loading branch information
Papamaci444 authored Sep 24, 2024
1 parent 1595084 commit b74ad0e
Show file tree
Hide file tree
Showing 27 changed files with 754 additions and 305 deletions.
10 changes: 8 additions & 2 deletions code/common/merge.q
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ getextrapartitiontype:{[tablename]
tabparts
};

/- function to check each partiton type specified in sort.csv is actually present in specified table
/- function to check each partition type specified in sort.csv is actually present in specified table
checkpartitiontype:{[tablename;extrapartitiontype]
$[count colsnotintab:extrapartitiontype where not extrapartitiontype in cols get tablename;
.lg.e[`checkpart;"parted columns ",(", " sv string colsnotintab)," are defined in sort.csv but not present in ",(string tablename)," table"];
.lg.o[`checkpart;"all parted columns defined in sort.csv are present in ",(string tablename)," table"]];
};

/- function to check if the extra partition column has a symbol type
checksymboltype:{[tablename;extrapartitiontype]
$[all extrapartitiontype in exec c from meta[tablename] where t="s";
.lg.o[`checksymbol;"all columns do have a symbol type in ",(string tablename)," table"];
.lg.e[`checksymbol;"not all columns ",string[extrapartitiontype]," do have a symbol type in ",(string tablename)," table"]];
};


/- function to get list of distinct combiniations for partition directories
Expand Down Expand Up @@ -66,7 +72,7 @@ mergebypart:{[tablename;dest;partchunks]
.lg.o[`merge;"upserting ",(string count chunks)," rows to ",string dest];
/-merge columns to permanent storage
.[upsert;(dest;chunks);
{.lg.e[`merge;"failed to merge to ", sting[dest], " from segments ", (", " sv string chunks)];}];
{.lg.e[`merge;"failed to merge to ", string[dest], " from segments ", (", " sv string chunks)];}];
};

/-merge data from partition in temporary storage to permanent storage, column by column rather than by entire partition
Expand Down
2 changes: 1 addition & 1 deletion code/common/os.q
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ df:{(`$("/";"\\")[NT]sv -1_v;`$-1#v:("/";"\\")[NT]vs pth(string x;x)[10h=type x]
run:{system"q ",x}
kill:{[p]@[(`::p);"\\\\";1];}
sleep:{x:string x; system("sleep ",x;"timeout /t ",x," >nul")[NT]}
pthq:{[x] $[10h=type x;ssr [x;"\\";"/"];`$ -1 _ ssr [string (` sv x,`);"\\";"/"]]}
pthq:{[x] $[10h=type x;ssr [x;"\\";"/"];`$ -1 _ ssr [string (` sv x,`);"\\";"/"]]}
95 changes: 95 additions & 0 deletions code/processes/idb.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/-default parameters
\d .idb

wdbtypes:@[value;wdbtypes;`wdb];

/-these parameters are only used once their value has been set with values retrieved from the WBD.
writedownmode:idbdir:savedir:currentpartition:symfilepath:`;
symsize:partitionsize:0;

/-force loads sym file
loadsym:{[]
.lg.o[`load;"loading the sym file"];
@[load;symfilepath; {.lg.e[`load;"failed to load sym file: ",string[symfilepath]," error: ",x]}];
symsize::hcount symfilepath;
};

/-force loads IDB
loadidb:{[]
.lg.o[`load;"loading the db"];
@[system; "l ", 1_string idbdir; {.lg.e[`load;"failed to load IDB: ",string[idbdir]," error: ",x]}];
partitionsize::count key idbdir;
};

/- force loads the idb and the sym file
loaddb:{[]
starttime:.proc.ct[];
loadsym[];
loadidb[];
.lg.o[`load;"IDB load has been finished for partition: ",string[currentpartition],". Time taken(ms): ",string .proc.ct[]-starttime];
};

/- sets current partition and force loads the idb and the sym file. Called by the WDB after EOD.
rollover:{[pt]
currentpartition::pt;
idbdir::.Q.dd[savedir; currentpartition];
loaddb[];
};

/- reloads the db. Called by wdb process midday/eod.
intradayreload:{[]
starttime:.proc.ct[];
if[symfilehaschanged[];loadsym[]];
if[partitioncounthaschanged[];loadidb[]];
.lg.o[`intradayreload;"IDB reload has been finished for partition: ",string[savedir],". Time taken(ms): ",string .proc.ct[]-starttime];
};

/- checks if sym file has changed since last reload of the IDB. Records new sym size if changed.
symfilehaschanged:{[]
$[symsize<>c:hcount symfilepath;[symsize::c; 1b];0b]
};

/- checks if count of partitions has changed since last reload of the IDB. Records new partition count if changed.
/- the default writedown method doesn't need db reloading as no new directory is being created there.
partitioncounthaschanged:{[]
if[writedownmode~`default;:0b];
$[partitionsize<>c:count key idbdir;[partitionsize::c; 1b];0b]
};

setparametersfromwdb:{[wdbHandle]
.lg.o[`init;"querying WDB, HDB locations, current partition and writedown mode from WDB"];
params:@[wdbHandle; (each;value;`.wdb.savedir`.wdb.hdbdir`.wdb.currentpartition`.wdb.writedownmode); {.lg.e[`connection; "Failed to retrieve values from WDB."]; 'x}];
savedir::hsym params[0];
currentpartition::params[2];
symfilepath::.Q.dd[hsym params[1]; `sym];
writedownmode::params[3];
idbdir::.Q.dd[savedir; $[writedownmode~`default;`;currentpartition]];
.lg.o[`init;"Current settings: db folder: ",string[idbdir],", sym file: ",string[symfilepath],", writedownmode: ", string writedownmode];
};

init:{[]
.lg.o[`init; "searching for servers"];
.servers.startup[];
.lg.o[`init;"getting connection handle to the WDB"];
w:.servers.gethandlebytype[wdbtypes;`any];
/-exit if no valid handle
if[0=count w; .lg.e[`connection;"no connection to the WDB could be established... failed to initialise."];:()];
.lg.o[`init;"found a WDB process"];
/-setting parameters in .idb namespace from WDB
setparametersfromwdb[w];
.lg.o[`init;"loading the db and the sym file first time"];
loaddb[];
.lg.o[`init;"registering IDBs on WDB process..."];
/-send sync message to WDB to register the existing IDBs.
@[w;(`.servers.registerfromdiscovery;`idb;0b);{.lg.e[`connection;"Failed to register IDB with WDB."];'x}];
.lg.o[`init; "Initialisation of the IDB is done."];
}

\d .

.idb.init[];

/- helper function to support queries against the sym column
maptoint:{[symbol]
sym?symbol
};
Loading

0 comments on commit b74ad0e

Please sign in to comment.