Replies: 3 comments 1 reply
-
The obvious question is why not just have the publisher add the timestamp to the JSON data it is publishing? That said, I'm not clear on what it is you are actually doing. What does your "Stream JSON sampler" actually do? Does it convert the JSON object provided in the stream subscribe callback to a metric set? Does it re-publish the data it receives? If it is re-publishing the data, it doesn't need to do so. The prdcr_subscribe command can be used at the aggregator to request that stream data received at the producer (i.e. the sampler daemon) be forwarded. All that said, there are at least a couple of options:
But all of that sounds a lot harder than just adding the timestamp to the JSON at the source. |
Beta Was this translation helpful? Give feedback.
-
Thanks Tom,
I'll try that. A little more information.
The sampler just takes an arbitrary string from some application that will
use the LDMS API to publish a string. This string could be anything
really. SLURM parameters, reframe data, modules states from the inside of
the system. What this string could contain is up to the systems staff and
what information they want.
My concern is that I can force JSON and I could require a timestamp, (I
just used timestamp as an easy example) but in my case I can have this
information coming from 4 different systems and I envision that the staff
members may either forget to put the system name in or copy a publish
script form one system to another, thus the data would be tagged with the
wrong system. I would like to fix or have the ability to fix certain
fields before inserting them into ElasticSearch.
Now the ElasticSearch is a shared resource within the center, thus the data
there needs to have some way to distinguish one system from another. (I
could use different ports on Logstash to add this data but I figured that
having the sampler do it would be nicer for the LDMS community since not
everyone would be able to use something like logstash. :-)
I really just wanted to get the tagging data as close to the source
producing the data as possible.
Example:
hello_publisher -x sock -h login01 -p 440 -s jdump -m '{"keyd":"fixy this
is a stringy2str"}' -t json
I would like to add: "timestamp":"some date/time","system":"systemA" to
that JSON string and pass that on to the aggregator. I can pass a default
"system" from the config options but was not sure how to get that included
in every stream entry.
Maybe this helped. If there is an easier way, I'm all for it. (Well to a
point. Requiring the sender to make sure the data is all there and correct
is the easiest and toughest. :-)
Thanks
Cary
…On Tue, Jun 1, 2021 at 12:43 PM Tom Tucker ***@***.***> wrote:
The obvious question is why not just have the publisher add the timestamp
to the JSON data it is publishing?
That said, I'm not clear on what it is you are actually doing. What does
your "Stream JSON sampler" actually do? Does it convert the JSON object
provided in the stream subscribe callback to a metric set? Does it
re-publish the data it receives? If it is re-publishing the data, it
doesn't need to do so. The prdcr_subscribe command can be used at the
aggregator to request that stream data received at the producer (i.e. the
sampler daemon) be forwarded.
All that said, there are at least a couple of options:
1.
The published message text is available in the message parameter of
the stream receive callback function. You could modify that text before you
republish it.
2.
You can modify the JSON object, adding the 'timestamp' attribute using
the json_attr_add() function on the entity provided in the callback, use
json_entity_dump() to convert the modified entity back into a string, and
then publish the resulting string.
But all of that sounds a lot harder than just adding the timestamp to the
JSON at the source.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#740 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AKB4BR4MHQ6ODUXABBGZRQTTQUZ43ANCNFSM455G6DLA>
.
|
Beta Was this translation helpful? Give feedback.
-
Thanks Tom,
This worked and I just want to add a couple of things for others.
1) In order to use ldmsd_stream_publish I needed to get an ldms_t handle.
Thus I used from the example code, setup_connection to get upstream.
2) LDMSD_STREAM_JSON and "clean-stream" are swapped. :-)
3) The open connection is right before the publish and I added
ldms_xprt_close(upstreadm) right after the publish to try not to bleed
sockets.
Other than that, now I have to clean up code.
Now there may be better or more efficient ways to do this. I've just
started looking at the code and am basically a hack. :-) So cleanup and
suggestions always welcome.
Thanks again for the help.
Cary
…On Tue, Jun 1, 2021 at 4:34 PM Tom Tucker ***@***.***> wrote:
Ok, I get it now. If you want to modify the data before it gets forwarded,
then you could simply encapsulate the string or JSON data that you receive
at your sampler in another object. Something like this:
int my_stream_receive_cb(client, cb_arg, stream_type, data, data_len, entity)
{
...
char wrapper[BIG_ENOUGH];
const char *my_wrapper_fmt = "{ \"timestamp\" : %ld, \"content\" : \"%s\" }";
int rc = snprintf(wrapper, BIG_ENOUGH, my_wrapper_fmt, time(NULL), data);
ldmsd_stream_publish(upstream, LDMSD_STREAM_JSON, "clean-stream", wrapper, strlen(wrapper)+1);
}
It is also possible to modify the JSON entity that you are provided and
then dump it to a string and publish it as per the above, e.g.
jbuf_t jbuf = jbuf_new();
json_entity_t timestamp = json_entity_new(JSON_INT_VALUE, time(NULL);
json_attr_add(entity, "timestamp", timestamp);
jbuf = json_entity_dump(jbuf, entity);
ldmsd_stream_publish(upstream, LDMSD_STREAM_JSON, "clean-stream", jbuf->buf, jbuf->buf_len);
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#740 (reply in thread)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AKB4BRYMI55KK32KTITWXCTTQVU7RANCNFSM455G6DLA>
.
|
Beta Was this translation helpful? Give feedback.
-
Hi,
Information/suggestions/help/resources on adding information to a JSON stream in a sampler plugin.
I've created a simple Stream JSON sampler and a Stream store plugin. This works and I can pass a JSON or String entry submitted at the sampler side and the information is passed to the aggregator. Then the aggregator writes it out to disk.
What I would like to do is add a 'timestamp' and maybe other information to that JSON message. I've tried at the sampler side but have not figured out how to add information into the stream. I think it would be easier to add the information to the aggregator side right before it is written out, but I may lose the availability of some information, like sample host.
Thus how does one modify the stream before sending to the aggregator?
Ultimately will be sending this to ElasticSearch.
Thanks
Cary
Beta Was this translation helpful? Give feedback.
All reactions