Skip to content

Commit

Permalink
tweaks to ingest-stats API
Browse files Browse the repository at this point in the history
  • Loading branch information
mmguero committed Dec 18, 2024
1 parent f172b95 commit 002a767
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions api/project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,14 @@

if databaseMode == malcolm_utils.DatabaseMode.ElasticsearchRemote:
import elasticsearch as DatabaseImport
from elasticsearch_dsl import Search as SearchClass, A as AggregationClass
from elasticsearch_dsl import Search as SearchClass, A as AggregationClass, Q as QueryClass

DatabaseClass = DatabaseImport.Elasticsearch
if opensearchHttpAuth:
DatabaseInitArgs['basic_auth'] = opensearchHttpAuth
else:
import opensearchpy as DatabaseImport
from opensearchpy import Search as SearchClass, A as AggregationClass
from opensearchpy import Search as SearchClass, A as AggregationClass, Q as QueryClass

DatabaseClass = DatabaseImport.OpenSearch
if opensearchHttpAuth:
Expand All @@ -248,6 +248,10 @@
)


def doctype_is_host_logs(d):
return any([str(d).lower().startswith(x) for x in ['host', 'beat', 'miscbeat']])


def random_id(length=20):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

Expand Down Expand Up @@ -390,7 +394,7 @@ def doctype_from_args(args):
return doctype
network|host
"""
return malcolm_utils.deep_get(args, ["doctype"], app.config["DOCTYPE_DEFAULT"])
return str(malcolm_utils.deep_get(args, ["doctype"], app.config["DOCTYPE_DEFAULT"])).lower()


def index_from_args(args):
Expand All @@ -411,8 +415,8 @@ def index_from_args(args):
app.config["MALCOLM_NETWORK_INDEX_PATTERN"],
"""
index = None
if dtype := str(doctype_from_args(args)).lower():
if dtype.startswith('host') or dtype.startswith('beat') or dtype.startswith('miscbeat'):
if dtype := doctype_from_args(args):
if doctype_is_host_logs(dtype):
index = app.config["MALCOLM_OTHER_INDEX_PATTERN"]
elif dtype.startswith('arkime') or dtype.startswith('session'):
index = app.config["ARKIME_NETWORK_INDEX_PATTERN"]
Expand All @@ -439,8 +443,8 @@ def timefield_from_args(args):
app.config["MALCOLM_NETWORK_INDEX_TIME_FIELD"],
"""
timefield = None
if dtype := str(doctype_from_args(args)).lower():
if dtype.startswith('host') or dtype.startswith('beat') or dtype.startswith('miscbeat'):
if dtype := doctype_from_args(args):
if doctype_is_host_logs(dtype):
timefield = app.config["MALCOLM_OTHER_INDEX_TIME_FIELD"]
elif dtype.startswith('arkime') or dtype.startswith('session'):
timefield = app.config["ARKIME_NETWORK_INDEX_TIME_FIELD"]
Expand Down Expand Up @@ -1185,10 +1189,35 @@ def ingest_stats():
result['latest_ingest_age_seconds'] = 0
try:
# do the aggregation bucket query for the max event.ingested value for each data source
s = SearchClass(
using=databaseClient,
index=index_from_args(get_request_arguments(request)),
).extra(size=0)
request_args = get_request_arguments(request)
s = (
SearchClass(
using=databaseClient,
index=index_from_args(request_args),
).extra(size=0)
# Exclusions:
# NGINX access and error logs: we want to exclude nginx error and
# access logs, otherwise the very act of accessing Malcolm will
# update the latest ingest time returned from this function.
# event() webhook: we want to exclude alerts written by the event()
# webhook API (see below) and limit our results to actual
# network logs ingested via PCAP, etc.
.query(
QueryClass(
'bool',
must_not=[
QueryClass(
'term',
**{
'event.module': (
'nginx' if doctype_is_host_logs(doctype_from_args(request_args)) else 'alerting'
)
},
)
],
)
)
)

hostAgg = AggregationClass('terms', field='host.name')
maxIngestAgg = AggregationClass('max', field='event.ingested')
Expand Down

0 comments on commit 002a767

Please sign in to comment.