Skip to content

Commit

Permalink
handle nodes being down better (arkime#1524)
Browse files Browse the repository at this point in the history
* handle nodes being down better

if remote node is down, skip the session and go back through skipped sessions later
consolidate redundant ui components for status and hunt details
don't display skipped hunts in progress bar
indicate number of skipped hunts in hunt details

* add console logs to diagnose potential problems

rearrange function
uncomment total sessions update

* remove settimeout for local testing

* fix searching for failed sessions when a node is down

update the progress bar correctly
show percentage correctly
save if searching for failed sessions and don't increment total sessions
return out of running hunt job if there are failed sessions to search
pass proper path to makeRequest when hunting failed sessions

* use hunt options to store searchingFailedSessions

* fix text for hunt job info

also make sure hunt job kicks off when failedSessionIds is only partially updated

* add limit to length of failed sessions

* remove todos and add comment

* use Db.getSession and error out of huntFailedSessions instead of nested if
  • Loading branch information
31453 authored Sep 25, 2020
1 parent 52883f9 commit e2fce2f
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 365 deletions.
138 changes: 127 additions & 11 deletions viewer/viewer.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7640,7 +7640,7 @@ function updateHuntStats (hunt, huntId, session, searchedSessions, cb) {

hunt.status = huntHit._source.status;
hunt.lastUpdated = now;
hunt.searchedSessions = searchedSessions;
hunt.searchedSessions = searchedSessions || hunt.searchedSessions;
hunt.lastPacketTime = lastPacketTime;

Db.setHunt(huntId, hunt, () => {});
Expand Down Expand Up @@ -7686,24 +7686,131 @@ function buildHuntOptions (huntId, hunt) {
return options;
}

// if we couldn't retrieve the seession, skip it but add it to failedSessionIds
// so that we can go back and search for it at the end of the hunt
function continueHuntSkipSession (hunt, huntId, session, sessionId, searchedSessions, cb) {
if (!hunt.failedSessionIds) {
hunt.failedSessionIds = [ sessionId ];
} else {
// pause the hunt if there are more than 10k failed sessions
if (hunt.failedSessionIds.length > 10000) {
return pauseHuntJobWithError(huntId, hunt, {
value: 'Error hunting: Too many sessions are unreachable. Please contact your administrator.'
});
}
// make sure the session id is not already in the array
// if it's not the first pass and a node is still down, this could be a duplicate
if (hunt.failedSessionIds.indexOf(sessionId) < 0) {
hunt.failedSessionIds.push(sessionId);
}
}

return updateHuntStats(hunt, huntId, session, searchedSessions, cb);
}

// if there are failed sessions, go through them one by one and do a packet search
// if there are no failed sessions left at the end then the hunt is done
// if there are still failed sessions, but some sessions were searched during the last pass, try again
// if there are still failed sessions, but no new sessions coudl be searched, pause the job with an error
function huntFailedSessions (hunt, huntId, options, searchedSessions, user) {
if (!hunt.failedSessionIds && !hunt.failedSessionIds.length) { return; }

let changesSearchingFailedSessions = false;

options.searchingFailedSessions = true;
// copy the failed session ids so we can remove them from the hunt
// but still loop through them iteratively
let failedSessions = JSON.parse(JSON.stringify(hunt.failedSessionIds));
// we don't need to search the db for session, we just need to search each session in failedSessionIds
async.forEachLimit(failedSessions, 3, function (sessionId, cb) {
Db.getSession(sessionId, { _source: 'node,huntName,huntId,lastPacket,field' }, (err, session) => {
if (err) {
return continueHuntSkipSession(hunt, huntId, session, sessionId, searchedSessions, cb);
}

session = session._source;

makeRequest(session.node, `${session.node}/hunt/${huntId}/remote/${sessionId}`, user, (err, response) => {
if (err) {
return continueHuntSkipSession(hunt, huntId, session, sessionId, searchedSessions, cb);
}

let json = JSON.parse(response);

if (json.error) {
console.log(`Error hunting on remote viewer: ${json.error} - ${path}`);
return continueHuntSkipSession(hunt, huntId, session, sessionId, searchedSessions, cb);
}

// remove from failedSessionIds if it was found
hunt.failedSessionIds.splice(hunt.failedSessionIds.indexOf(sessionId), 1);
// there were changes to this hunt; we're making progress
changesSearchingFailedSessions = true;

if (json.matched) { hunt.matchedSessions++; }
return updateHuntStats(hunt, huntId, session, searchedSessions, cb);
});
});
}, function (err) { // done running a pass of the failed sessions
function continueProcess () {
Db.setHunt(huntId, hunt, (err, info) => {
internals.runningHuntJob = undefined;
processHuntJobs(); // Start new hunt
});
}

if (hunt.failedSessionIds && hunt.failedSessionIds.length === 0) {
options.searchingFailedSessions = false; // no longer searching failed sessions
// we had failed sessions but we're done searching through them
// so we're completely done with this hunt (inital search and failed sessions)
hunt.status = 'finished';

if (hunt.notifier) {
let message = `*${hunt.name}* hunt job finished:\n*${hunt.matchedSessions}* matched sessions out of *${hunt.searchedSessions}* searched sessions`;
issueAlert(hunt.notifier, message, continueProcess);
} else {
return continueProcess();
}
} else if (hunt.failedSessionIds && hunt.failedSessionIds.length > 0 && changesSearchingFailedSessions) {
// there are still failed sessions, but there were also changes,
// so keep going
// uninitialize hunts so that the running job with failed sessions will kick off again
internals.proccessHuntJobsInitialized = false;
return continueProcess();
} else if (!changesSearchingFailedSessions) {
options.searchingFailedSessions = false; // no longer searching failed sessions
// there were no changes, we're still struggling to connect to one or
// more renote nodes, so error out
return pauseHuntJobWithError(huntId, hunt, {
value: 'Error hunting previously unreachable sessions. There is likely a node down. Please contact your administrator.'
});
}
});
}

// Actually do the search against ES and process the results.
function runHuntJob (huntId, hunt, query, user) {
let options = buildHuntOptions(huntId, hunt);
let searchedSessions;

// look for failed sessions if we're done searching sessions normally
if (!options.searchingFailedSessions && hunt.searchedSessions === hunt.totalSessions && hunt.failedSessionIds && hunt.failedSessionIds.length) {
options.searchingFailedSessions = true;
return huntFailedSessions(hunt, huntId, options, searchedSessions, user);
}

Db.search('sessions2-*', 'session', query, { scroll: internals.esScrollTimeout }, function getMoreUntilDone (err, result) {
if (err || result.error) {
pauseHuntJobWithError(huntId, hunt, { value: `Hunt error searching sessions: ${err}` });
return;
}

let hits = result.hits.hits;

if (searchedSessions === undefined) {
searchedSessions = hunt.searchedSessions || 0;
// if the session query results length is not equal to the total sessions that the hunt
// job is searching, update the hunt total sessions so that the percent works correctly
if (hunt.totalSessions !== (result.hits.total + searchedSessions)) {
if (!options.searchingFailedSessions && hunt.totalSessions !== (result.hits.total + searchedSessions)) {
hunt.totalSessions = result.hits.total + searchedSessions;
}
}
Expand All @@ -7714,7 +7821,7 @@ function runHuntJob (huntId, hunt, query, user) {
let sessionId = Db.session2Sid(hit);
let node = session.node;

// There is no files, this is a fake session, don't hunt it
// There are no files, this is a fake session, don't hunt it
if (session.fileId === undefined || session.fileId.length === 0) {
return updateHuntStats(hunt, huntId, session, searchedSessions, cb);
}
Expand All @@ -7738,7 +7845,7 @@ function runHuntJob (huntId, hunt, query, user) {

makeRequest(node, path, user, (err, response) => {
if (err) {
return pauseHuntJobWithError(huntId, hunt, { value: `Error hunting on remote viewer: ${err}` }, node);
return continueHuntSkipSession(hunt, huntId, session, sessionId, searchedSessions, cb);
}
let json = JSON.parse(response);
if (json.error) {
Expand Down Expand Up @@ -7766,17 +7873,25 @@ function runHuntJob (huntId, hunt, query, user) {

Db.clearScroll({ body: { scroll_id: result._scroll_id } });

// We are totally done with this hunt
hunt.status = 'finished';
hunt.searchedSessions = hunt.totalSessions;

function continueProcess () {
Db.setHunt(huntId, hunt, (err, info) => {
internals.runningHuntJob = undefined;
processHuntJobs(); // Start new hunt
processHuntJobs(); // start new hunt or go back over failedSessionIds
});
}

// the hunt is not actually finished, need to go through the failed session ids
if (hunt.failedSessionIds && hunt.failedSessionIds.length) {
// uninitialize hunts so that the running job with failed sessions will kick off again
internals.proccessHuntJobsInitialized = false;
return continueProcess();
}

// We are totally done with this hunt
hunt.status = 'finished';

if (hunt.notifier) {
let message = `*${hunt.name}* hunt job finished:\n*${hunt.matchedSessions}* matched sessions out of *${hunt.searchedSessions}* searched sessions`;
issueAlert(hunt.notifier, message, continueProcess);
Expand Down Expand Up @@ -7891,11 +8006,12 @@ function processHuntJobs (cb) {
var hunt = hit._source;
let id = hit._id;

if (hunt.status === 'running') { // there is a job already running
// there is a job already running
if (hunt.status === 'running') {
internals.runningHuntJob = hunt;
if (!internals.proccessHuntJobsInitialized) {
internals.proccessHuntJobsInitialized = true;
// restart the abandoned hunt
// restart the abandoned or incomplete hunt
processHuntJob(id, hunt);
}
return (cb ? cb() : null);
Expand Down Expand Up @@ -8030,7 +8146,7 @@ app.get('/hunt/list', [noCacheJson, recordResponseTime, checkPermissions(['packe
}
});
}
} else { // get queued, paused, and running jobs
} else { // get queued, paused, running jobs
query.from = 0;
query.size = 1000;
query.query.bool.must.push({ terms: { status: ['queued', 'paused', 'running'] } });
Expand Down
Loading

0 comments on commit e2fce2f

Please sign in to comment.