Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 logsearch #89

Merged
merged 14 commits into from
Oct 8, 2024
5 changes: 5 additions & 0 deletions jobs/ingestor_logsearch/monit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
check process ingestor_logsearch
with pidfile /var/vcap/sys/run/bpm/ingestor_logsearch/ingestor_logsearch.pid
start program "/var/vcap/jobs/bpm/bin/bpm start ingestor_logsearch"
stop program "/var/vcap/jobs/bpm/bin/bpm stop ingestor_logsearch"
group vcap
204 changes: 204 additions & 0 deletions jobs/ingestor_logsearch/spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
---
name: ingestor_logsearch

description: |
This job runs Logstash process which ingests data by standard Syslog protocol, make some processing,
then forward it to opensearch cluster

packages:
- logstash
- logsearch-config
- logsearch-filters
- openjdk-17

templates:
bin/ingestor_logsearch: bin/ingestor_logsearch.sh
bin/pre-start: bin/pre-start
config/bpm.yml.erb: config/bpm.yml
config/input_and_output.conf.erb: config/input_and_output.conf
config/filters_pre.conf.erb: config/filters_pre.conf
config/filters_post.conf.erb: config/filters_post.conf
config/filters_override.conf.erb: config/filters_override.conf
config/syslog_tls.crt.erb: config/syslog_tls.crt
config/syslog_tls.key.erb: config/syslog_tls.key
config/logstash.yml.erb: config/logstash.yml
config/jvm.options.erb: config/jvm.options
config/ingestor-crt.erb: config/ssl/ingestor.crt
config/ingestor-pem.erb: config/ssl/ingestor.pem
config/ca.erb: config/ssl/opensearch.ca

provides:
- name: ingestor_logsearch
type: ingestor_logsearch
properties:
- logstash_ingestor.syslog.port
- logstash_ingestor.syslog.transport
- logstash_ingestor.syslog_tls.port
- logstash_ingestor.relp.port
consumes:
- name: opensearch
type: opensearch
optional: true

properties:
logstash.ssl_client_authentication:
description: Controls the servers behavior in regard to requesting a certificate from client connections
default: required
logstash.heap_size:
description: sets jvm heap sized
logstash.heap_percentage:
description: The percentage value used in the calculation to set the heap size.
default: 46
logstash.jvm_options:
description: additional jvm options
default: []
logstash.metadata_level:
description: "Whether to include additional metadata throughout the event lifecycle. NONE = disabled, DEBUG = fully enabled"
default: "NONE"
logstash.log_level:
description: The default logging level (e.g. WARN, DEBUG, INFO)
default: info
logstash.plugins:
description: "Array of hashes describing logstash plugins to install"
example:
- {name: logstash-output-cloudwatchlogs, version: 2.0.0}
default: []

logstash.ecs_compatibility:
description: Whether to enable ECS compatibility for geoip filters. See https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-ecs_compatibility
default: "disabled"

logstash.env:
description: "a list of arbitrary key-value pairs to be passed on as process environment variables. eg: FOO: 123"
default: []

logstash.queue.type:
description: Internal queuing model, "memory" for legacy in-memory based queuing and "persisted" for disk-based acked queueing.
default: persisted
logstash.queue.page_capacity:
description: The page data files size. The queue data consists of append-only data files separated into pages.
default: 250mb
logstash.queue.max_events:
description: The maximum number of unread events in the queue.
default: 0
logstash.queue.max_bytes:
description: The total capacity of the queue in number of bytes.
default: 1024mb
logstash.queue.checkpoint.acks:
description: The maximum number of acked events before forcing a checkpoint.
default: 1024
logstash.queue.checkpoint.writes:
description: The maximum number of written events before forcing a checkpoint.
default: 1024
logstash.queue.checkpoint.interval:
description: The interval in milliseconds when a checkpoint is forced on the head page.
default: 1000

logstash_ingestor.filters:
description: Filters to execute on the ingestors
default: ""

logstash_ingestor.syslog.port:
description: Port to listen for syslog messages
logstash_ingestor.syslog.transport:
description: Transport protocol to use
default: "tcp"
logstash_ingestor.syslog.use_keepalive:
description: Instruct the socket to use TCP keep alives

logstash_ingestor.health.disable_post_start:
description: Skip post-start health checks?
default: false
logstash_ingestor.health.interval:
description: Logstash syslog health check interval (seconds)
default: 5
logstash_ingestor.health.timeout:
description: Logstash syslog health check number of attempts (seconds)
default: 300

logstash_ingestor.syslog_tls.port:
description: Port to listen for syslog-TLS messages (omit to disable)
logstash_ingestor.syslog_tls.ssl_cert:
description: Syslog-TLS SSL certificate (file contents, not a path) - required if logstash_ingestor.syslog_tls.port set
logstash_ingestor.syslog_tls.ssl_key:
description: Syslog-TLS SSL key (file contents, not a path) - required if logstash_ingestor.syslog_tls.port set
logstash_ingestor.syslog_tls.skip_ssl_validation:
description: Verify the identity of the other end of the SSL connection against the CA.
default: false
logstash_ingestor.syslog_tls.use_keepalive:
description: Instruct the socket to use TCP keep alives

logstash_ingestor.relp.port:
description: Port to listen for RELP messages
default: 2514

logstash_parser.debug:
description: Debug level logging
default: false
logstash_parser.message_max_size:
description: "Maximum log message length. Anything larger is truncated (TODO: move this to ingestor?)"
default: 1048576
logstash_parser.filters:
description: "The configuration to embed into the logstash filters section. Can either be a set of parsing rules as a string or a list of hashes in the form of [{name: path_to_parsing_rules.conf}]"
default: ''
logstash_parser.deployment_dictionary:
description: "A list of files concatenated into one deployment dictionary file. Each file contains a hash of job name-deployment name keypairs for @source.deployment lookup."
default: [ "/var/vcap/packages/logsearch-config/deployment_lookup.yml" ]
logstash_parser.inputs:
description: |
A list of input plugins, with a hash of options for each of them. Please refer to example below.
example:
inputs:
- plugin: rabbitmq
options:
host: 192.168.1.1
user: logsearch
password: c1oudbunny
logstash_parser.outputs:
description: |
A list of output plugins, with a hash of options for each of them. Please refer to example below.
example:
inputs:
- plugin: mongodb
options:
uri: 192.168.1.1
database: logsearch
collection: logs
default: [ { plugin: "opensearch", options: {} } ]
logstash_parser.workers:
description: "The number of worker threads that logstash should use (default: auto = one per CPU)"
default: auto
logstash_parser.opensearch.idle_flush_time:
description: "How frequently to flush events if the output queue is not full."
logstash_parser.opensearch.document_id:
description: "Use a specific, dynamic ID rather than an auto-generated identifier."
default: ~
logstash_parser.opensearch.index:
description: "The specific, dynamic index name to write events to."
default: "logstash-%{+YYYY.MM.dd}"
logstash_parser.opensearch.index_type:
description: "The specific, dynamic index type name to write events to."
default: "%{@type}"
logstash_parser.opensearch.routing:
description: "The routing to be used when indexing a document."
logstash_parser.opensearch.ssl.certificate:
description: Node certificate for communication between logstash_parser and Opensearch
logstash_parser.opensearch.ssl.private_key:
description: Private key for communication between logstash_parser and Opensearch
logstash_parser.opensearch.data_hosts:
description: The list of opensearch data node IPs
logstash_parser.opensearch.verification_mode:
description: the verification mode, can be full or none
default: "full"
logstash_parser.timecop.reject_greater_than_hours:
description: "Logs with timestamps greater than this many hours in the future won't be parsed and will get tagged with fail/timecop"
default: 1
logstash_parser.timecop.reject_less_than_hours:
description: "Logs with timestamps less than this many hours in the past won't be parsed and will get tagged with fail/timecop"
default: 24
logstash_parser.enable_json_filter:
description: "Toggles the if_it_looks_like_json.conf filter rule"
default: false
logstash_parser.wait_for_templates:
description: "A list of index templates that need to be present in opensearch before the process starts"
default: ["index_template"]
111 changes: 111 additions & 0 deletions jobs/ingestor_logsearch/templates/bin/ingestor_logsearch
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/bin/bash

set -e # exit immediately if a simple command exits with a non-zero status
set -u # report the usage of uninitialized variables

# Setup env vars and folders for the webapp_ctl script
JOB_NAME=ingestor_logsearch
export LOG_DIR=/var/vcap/sys/log/$JOB_NAME
export STORE_DIR=/var/vcap/store/$JOB_NAME
export JOB_DIR=/var/vcap/jobs/$JOB_NAME
source /var/vcap/packages/openjdk-17/bosh/runtime.env

<%
es_host = nil
if_p("logstash_parser.opensearch.data_hosts") { |hosts| es_host = hosts.first }
unless es_host
es_host = link("opensearch").instances.first.address
end
%>

function wait_for_template {
local template_name="$1"
local MASTER_URL="<%= es_host %>:9200"

set +e
while true; do
echo "Waiting for index template to be uploaded: $template_name"
curl \
--key ${JOB_DIR}/ssl/ingestor.key \
--cert ${JOB_DIR}/ssl/ingestor.crt \
--cacert ${JOB_DIR}/ssl/opensearch.ca \
-I -f -i "$MASTER_URL"/_template/$template_name > /dev/null 2>&1
[ $? ] && break
sleep 5
done
set -e
echo "Found $template_name"
}

export PORT=${PORT:-5000}
export LANG=en_US.UTF-8
<% if 'auto' == p('logstash_parser.workers') %>
# 1 logstash worker / CPU core
export LOGSTASH_WORKERS=`grep -c ^processor /proc/cpuinfo`
<% else %>
export LOGSTASH_WORKERS=<%= p('logstash_parser.workers') %>
<% end %>
#export TIMECOP_REJECT_GREATER_THAN_HOURS=<%= p('logstash_parser.timecop.reject_greater_than_hours') %>
#export TIMECOP_REJECT_LESS_THAN_HOURS=<%= p('logstash_parser.timecop.reject_less_than_hours') %>
export HEAP_SIZE=$((( $( cat /proc/meminfo | grep MemTotal | awk '{ print $2 }' ) * <%= p("logstash.heap_percentage") %> ) / 100 ))K
<% if_p('logstash.heap_size') do |heap_size| %>
HEAP_SIZE=<%= heap_size %>
<% end %>
<% p("logstash.env").each do |env| %>
export <%= env.keys[0] %>="<%= env.values[0] %>"
<% end %>

<% p("logstash_parser.wait_for_templates").each do |template| %>
wait_for_template "<%= template %>"
<% end %>

export LS_JAVA_OPTS="-Xms$HEAP_SIZE -Xmx$HEAP_SIZE -DPID=$$"

# construct a complete config file from all the fragments
cat ${JOB_DIR}/config/input_and_output.conf > ${JOB_DIR}/config/logstash.conf

# append deployment dictionary files
<% p('logstash_parser.deployment_dictionary').each do |dictionary_path| %>
cat "<%= dictionary_path %>" >> ${JOB_DIR}/config/deployment_lookup.yml
<% end %>

echo "filter {" >> ${JOB_DIR}/config/logstash.conf

cat ${JOB_DIR}/config/filters_pre.conf >> ${JOB_DIR}/config/logstash.conf
cat /var/vcap/packages/logsearch-config/logstash-filters-default.conf >> ${JOB_DIR}/config/logstash.conf

<% if p('logstash_parser.filters').is_a? Array %>
<% p('logstash_parser.filters').each do |filter| %>
<% if filter.key? 'path' %>
cat "<%= filter['path'] %>" >> ${JOB_DIR}/config/logstash.conf
<% elsif !filter.key? 'content' %>
<% _, path = filter.first %>
cat "<%= path %>" >> ${JOB_DIR}/config/logstash.conf
<% end %>
<% end %>
<% end %>
cat ${JOB_DIR}/config/filters_override.conf >> ${JOB_DIR}/config/logstash.conf

cat ${JOB_DIR}/config/filters_post.conf >> ${JOB_DIR}/config/logstash.conf
<% if p('logstash_parser.enable_json_filter') %>
cat /var/vcap/packages/logsearch-config/if_it_looks_like_json.conf >> ${JOB_DIR}/config/logstash.conf
<% end %>
#cat /var/vcap/packages/logsearch-config/timecop.conf >> ${JOB_DIR}/config/logstash.conf
cat /var/vcap/packages/logsearch-config/deployment.conf >> ${JOB_DIR}/config/logstash.conf

echo "} #close filters" >> ${JOB_DIR}/config/logstash.conf

# clear persistent queue if the upgrade failed last run
if cat $LOG_DIR/$JOB_NAME.stdout.log | grep -a 'QueueUpgrade - Logstash was unable to upgrade your persistent queue data' >/dev/null ; then
mkdir ${STORE_DIR}/oldqueue.$$
mv ${STORE_DIR}/queue ${STORE_DIR}/.lock ${STORE_DIR}/dead_letter_queue ${STORE_DIR}/uuid ${STORE_DIR}/oldqueue.$$
mv $LOG_DIR/$JOB_NAME.stdout.log $LOG_DIR/$JOB_NAME.stdout.log.old
fi

/var/vcap/packages/logstash/bin/logstash \
--path.data ${STORE_DIR} \
--path.config ${JOB_DIR}/config/logstash.conf \
--path.settings ${JOB_DIR}/config \
--pipeline.ecs_compatibility <%= p("logstash.ecs_compatibility") %> \
--pipeline.workers ${LOGSTASH_WORKERS} \
--log.format=json --log.level=<%= p("logstash.log_level") %>
22 changes: 22 additions & 0 deletions jobs/ingestor_logsearch/templates/bin/pre-start
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
source /var/vcap/packages/openjdk-17/bosh/runtime.env

export JOB_NAME=ingestor_logsearch
export JOB_DIR=/var/vcap/jobs/$JOB_NAME

<% p("logstash.plugins").each do |plugin| %>
/var/vcap/packages/logstash/bin/logstash-plugin install \
<%= plugin.except("name").map { |key, value| "--#{key}=#{value}" }.join(" ") %> \
<%= plugin["name"] %>
<% end %>

<% if_link('opensearch') do |ingestor_logsearch| %>
openssl pkcs8 -v1 "PBE-SHA1-3DES" \
-in "${JOB_DIR}/config/ssl/ingestor.pem" -topk8 \
-out "${JOB_DIR}/config/ssl/ingestor.key" -nocrypt
chmod 600 ${JOB_DIR}/config/ssl/ingestor.key
<% end %>

if [ -d ${JOB_DIR}/config/ssl ]; then
chown -R vcap:vcap ${JOB_DIR}/config/ssl
fi
14 changes: 14 additions & 0 deletions jobs/ingestor_logsearch/templates/config/bpm.yml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
processes:
- name: ingestor_logsearch
hooks:
pre_start: /var/vcap/jobs/ingestor_logsearch/bin/pre-start
executable: /var/vcap/jobs/ingestor_logsearch/bin/ingestor_logsearch.sh
ephemeral_disk: true
persistent_disk: true
additional_volumes:
- path: /var/vcap/sys/tmp/ingestor_logsearch
writable: true
allow_executions: true
- path: /var/vcap/jobs/ingestor_logsearch/config
writable: true
- path: /var/vcap/jobs/deployment_lookup_config/config
5 changes: 5 additions & 0 deletions jobs/ingestor_logsearch/templates/config/ca.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<% if_link("opensearch") do |opensearch_config| %>
<% opensearch_config.if_p('opensearch.node.ssl.ca') do %>
<%= opensearch_config.p('opensearch.node.ssl.ca', '') %>
<% end %>
<% end %>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<% if p('logstash_parser.filters').is_a? Array %>
<% p('logstash_parser.filters').each do |filter| %>
<% if filter.key? 'content' %>
<%= filter['content'] %>
<% end %>
<% end %>
<% elsif p('logstash_parser.filters') != '' %>
<%= p('logstash_parser.filters') %>
<% end %>
Loading