Skip to content

Commit

Permalink
Unregister from gw finspace torQ compatibiity (#633)
Browse files Browse the repository at this point in the history
* changes required for unregister_from_gw_finspace code

* leading whitspace removal

* merge with master and ignore check for active servers on adding server results if finspace flag is true

* small syntax changes

* compatibility additions for new gateway dereg logic

* change where setdereg gets called

* small changes based on comments

* remove outdate comment
  • Loading branch information
eugenetemlock authored Feb 22, 2024
1 parent 8f8c165 commit 517fbc9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
9 changes: 9 additions & 0 deletions code/common/finspace.q
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,12 @@ newrdbup:{[]
.lg.o[`newrdbup;"received signal from next period rdb, setting rdbready to true"];
@[`.finspace;`rdbready;:;1b];
};

deletecluster:{[clustername]
if[not any (10h;-11h)=fType:type clustername; .lg.e[`deletecluster;"clustername must be of type string or symbol: 10h -11h, got ",-3!fType]; :(::)];
if[-11h~fType; clustername:string clustername];
.lg.o[`deletecluster;"Going to delete ",$[""~clustername;"current cluster";"cluster named: ",clustername]];
.aws.delete_kx_cluster[clustername]; // calling this on an empty string deletes self
// TODO ZAN Error trap
// Test this with invalid cluster names and catch to show error messages
};
4 changes: 4 additions & 0 deletions code/gateway/gatewaylib.q
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ getserverscross:{[req;att;besteffort]
/- return the parameters which should be queried for
(key s)!distinct each' flip each util[w]`found
}

addserversfromconnectiontable:{
{.gw.addserverattr'[x`w;x`proctype;x`attributes]}[select w,proctype,attributes from .servers.SERVERS where ((proctype in x) or x~`ALL),not w in ((0;0Ni),exec handle from .gw.servers where active)];}

24 changes: 18 additions & 6 deletions code/processes/gateway.q
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ setserverstate:{[serverh;use]
$[use;
update inuse:use,lastquery:.proc.cp[],querycount+1i from `.gw.servers where handle in serverh;
update inuse:use,usage:usage+.proc.cp[] - lastquery from `.gw.servers where handle in serverh]}
setderegserverids:{[serverh]
if[@[value;`.finspace.enabled;0b] and @[value;`.finspace.dereginprog;0b];
svrIDs:exec serverid from .gw.servers where not null handle, handle in serverh;
.finspace.checkremainingqueriesforserver'[svrIDs];
]}



// return a list of available servers
// override this function for different routing algorithms e.g. maybe only send to servers in the same datacentre, country etc.
Expand Down Expand Up @@ -193,14 +200,17 @@ deleteresult:{[queryid] .gw.results : (queryid,()) _ .gw.results}

// add a result coming back from a server
addserverresult:{[queryid;results]
serverid:first exec serverid from .gw.servers where active, handle=.z.w;
serverid:$[@[value;`.finspace.enabled;0b];
first exec serverid from .gw.servers where handle=.z.w;
first exec serverid from .gw.servers where active, handle=.z.w];
if[queryid in key .gw.results;
.[`.gw.results;(queryid;1;.gw.results[queryid;1;;0]?serverid;1);:;results];
.[`.gw.results;(queryid;1;.gw.results[queryid;1;;0]?serverid;2);:;1b]
];
setserverstate[.z.w;0b];
runnextquery[];
checkresults[queryid]}
checkresults[queryid];
setderegserverids[.z.w]}
// handle an error coming back from the server
addservererror:{[queryid;error]
// propagate the error to the client
Expand All @@ -209,6 +219,7 @@ addservererror:{[queryid;error]
runnextquery[];
// finish the query
finishquery[queryid;1b;0Ni];
setderegserverids[.z.w];
}
// check if all results are in. If so, send the results to the client
checkresults:{[queryid]
Expand Down Expand Up @@ -281,6 +292,11 @@ removeserverhandle:{[serverh]

// mark the server as inactive
update handle:0Ni, active:0b, disconnecttime:.proc.cp[] from `.gw.servers where handle=serverh;
// finspace check
if[@[value;`.finspace.enabled;0b] and @[value;`.finspace.dereginprog;0b];
.finspace.deregserverids:.finspace.deregserverids _ serverid;
.finspace.dereginprog:0<count .finspace.deregserverids _ 0N;
];

runnextquery[];
}
Expand Down Expand Up @@ -523,10 +539,6 @@ while[0 = count .servers.getservers[`proctype;`discovery;()!();0b;1b];
.servers.startup[];
.servers.retrydiscovery[]]

// add servers from the standard connections table
addserversfromconnectiontable:{
{.gw.addserverattr'[x`w;x`proctype;x`attributes]}[select w,proctype,attributes from .servers.SERVERS where ((proctype in x) or x~`ALL),not w in ((0;0Ni),exec handle from .gw.servers where active)];}

// When new connections come in from the discovery service, try to reconnect
.servers.addprocscustom:{[connectiontab;procs]
// retry connections
Expand Down

0 comments on commit 517fbc9

Please sign in to comment.