Skip to content

Commit

Permalink
Merge pull request #2117 from pauljevans/pevans-merge
Browse files Browse the repository at this point in the history
pcp2elasticsearch: support dynamic mapping
  • Loading branch information
myllynen authored Jan 15, 2025
2 parents 78e1a68 + 672fb7f commit 994dab5
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 105 deletions.
24 changes: 17 additions & 7 deletions qa/1130
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# PCP QA Test No. 1130
# checks basic pcp2elasticsearch functionality
#
# Copyright (c) 2017-2019 Red Hat.
# Copyright (c) 2017-2025 Red Hat.
#
seq=`basename $0`
echo "QA output created by $seq"
Expand All @@ -13,7 +13,7 @@ $python -c "from pcp import pmapi" >/dev/null 2>&1
[ $? -eq 0 ] || _notrun "python pcp pmapi module not installed"

# From Andreas 27 Jul 2022 ...
# [pcp2elasticsearch] now uses a method='PUT' argument, but this is
# [pcp2elasticsearch] now uses a method='POST' argument, but this is
# not available in Python versions before 3.3
#
__version=`$python --version 2>&1 | sed -e 's/Python //'`
Expand Down Expand Up @@ -90,11 +90,21 @@ wait
#
grep -q 'socat.* Remote end closed connection without response' $tmp.socat.err && \
_notrun "socat on this platform is not behaving as expected [closed conn]"
grep -q 'socat.* Connection reset by peer' $tmp.socat.log && \
_notrun "socat on this platform is not behaving as expected [reset conn]"

# From Paul 13 Jan 2025 ...
# Getting reset connection messages with socat even though exporter works
# correctly meaning the test is not being run. Happens with prior upstream
# exporter version also:
# ... socat[957217] E read(6, 0x558187c97000, 8192): Connection reset by peer
#
# Commenting out the check and allowing QA test to run
#
#grep -q 'socat.* Connection reset by peer' $tmp.socat.log && \
# _notrun "socat on this platform is not behaving as expected [reset conn]"

grep -q 'socat.* Broken pipe' $tmp.socat.log && \
_notrun "socat on this platform is not behaving as expected [broken pipe]"
grep -E -q '^PUT /+pcp HTTP/' $tmp.socat.err
grep -E -q '^POST /+pcp/_doc HTTP/' $tmp.socat.err
[ $? -eq 0 ] && echo "Found correct index in output"
grep -E -q '"hinv": {"ncpu": '$ncpu'}' $tmp.socat.err
[ $? -eq 0 ] && echo "Found correct value in output"
Expand All @@ -108,7 +118,7 @@ $pcp2elasticsearch -t 1 -s 1 -X QAHOST -x INDEX hinv.ncpu >$tmp.p2e.out 2>$tmp.p
sleep 2
$signal $pid 2>/dev/null
wait
grep -E -q '^PUT /+INDEX HTTP/' $tmp.socat.err
grep -E -q '^POST /+INDEX/_doc HTTP/' $tmp.socat.err
[ $? -eq 0 ] && echo "Found correct index in output"
grep -E -q '"@host-id": "QAHOST"' $tmp.socat.err
[ $? -eq 0 ] && echo "Found proper hostid in output"
Expand All @@ -123,7 +133,7 @@ sleep 2
$signal $pid 2>/dev/null
wait
echo "--- Start of received data ---"
grep -E '(mappings|slack)' $tmp.socat.err | sed -e 's,< [1-9].*,,g' >$tmp.archive.data
grep -E '(slack)' $tmp.socat.err | sed -e 's,< [1-9].*,,g' >$tmp.archive.data
while read -r line
do
echo $line | _filter_prec | pmjson | LC_COLLATE=POSIX sort | tr -d ,
Expand Down
75 changes: 0 additions & 75 deletions qa/1130.out
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,6 @@ Found correct index in output
Found proper hostid in output
=== 3. pcp2elasticsearch full-blown archive replay session ===
--- Start of received data ---
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "000001 /usr/lib/systemd/systemd --switched-root --system --deserialize 21"
"@id": "1 minute"
"@id": "cpu0"
Expand Down Expand Up @@ -86,21 +71,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 0.0
Expand Down Expand Up @@ -181,21 +151,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 0.0
Expand Down Expand Up @@ -276,21 +231,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 9.999
Expand Down Expand Up @@ -371,21 +311,6 @@ Found proper hostid in output
}
}
{
}
"type": "epoch_milli"
"type": "string"
"@host-id": {
"@timestamp": {
}
}
"properties": {
}
"pcp-metric": {
}
"ignore": 400
"mappings": {
}
{
}
"@id": "cpu0"
"user": 0.0
Expand Down
27 changes: 4 additions & 23 deletions src/pcp2elasticsearch/pcp2elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env pmpython
#
# Copyright (C) 2015-2021 Marko Myllynen <[email protected]>
# Copyright (C) 2014-2018,2022 Red Hat.
# Copyright (C) 2014-2018,2025 Red Hat.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
Expand Down Expand Up @@ -47,7 +47,7 @@
CONFVER = 1
ES_SERVER = "http://localhost:9200/"
ES_INDEX = "pcp"
ES_SEARCH_TYPE = "pcp-metric"
ES_SEARCH_TYPE = "_doc"

class pcp2elasticsearch(object):
""" PCP to Elasticsearch """
Expand Down Expand Up @@ -429,26 +429,6 @@ def write_es(self, timestamp):

ts = self.context.datetime_to_secs(self.pmfg_ts(), PM_TIME_MSEC)

try:
body = {'ignore': 400,
'mappings': {'pcp-metric':
{'properties':{'@timestamp':{'type':'epoch_milli'},
'@host-id':{'type':'string'}}}}}
data = json.dumps(body).encode('utf-8')
headers = {'content-type': 'application/json'}
url = self.es_server + '/' + self.es_index
req = httprequest.Request(url=url, data=data, headers=headers, method='PUT')
with httprequest.urlopen(req) as f:
f.close()
if self.es_failed:
sys.stderr.write("Reconnected to Elasticsearch server %s.\n" % (self.es_server))
self.es_failed = False
except Exception as put_failed:
if not self.es_failed:
sys.stderr.write("Cannot connect to Elasticsearch server %s: %s, continuing.\n" % (self.es_server, str(put_failed)))
self.es_failed = True
return

# Assemble all metrics into a single document
# Use @-prefixed keys for metadata not coming in from PCP metrics
es_doc = {'@host-id': self.es_hostid, '@timestamp': long(ts)}
Expand Down Expand Up @@ -514,8 +494,9 @@ def write_es(self, timestamp):
insts.append({inst_key: name, last_part: value})

try:
headers = {'content-type': 'application/json'}
data = json.dumps(es_doc).encode('utf-8')
url = self.es_server + '/' + self.es_index + '/' + self.es_search_type
url = self.es_server + self.es_index + '/' + self.es_search_type
req = httprequest.Request(url=url, data=data, headers=headers, method='POST')
with httprequest.urlopen(req) as f:
f.close()
Expand Down

0 comments on commit 994dab5

Please sign in to comment.