Skip to content

JSONFilter

Jimmy Bonds edited this page Jul 17, 2020 · 2 revisions

kafkactl JSON Filter Example:

The --json-filter flag can be used to filter a stream of JSON encoded messages. Currently only used in conjunction with the --follow flag.

Examples:

  • Consider the following JSON Structure:
{
  "datacenter": "atlanta",
  "env": "dev",
  "host": {
    "name": "server01.example.com"
  },
  "log": "tomcat.log",
  "message": "INFO message here",
  "node": "server01",
  "nested": [
    {"key1": "server01 nested value 1"},
    {"key2": "server01 nested value 2"}
  ]
}

We'll create and load a topic with multiple messages with this structure for this example.

  • Create the topic and load messages:
> kafkactl admin create topic temp-topic-test --partitions 1 --replicas 1
Successfully created topic temp-topic-test

> kafkactl get topic temp-topic-test --describe
TOPIC            PART  OFFSET  LEADER  REPLICAS  ISRs  OFFLINE
temp-topic-test  0     0       3       [3]       [3]   []

echo '{"datacenter": "atlanta", "env": "prod", "host": {"name": "server01.example.com"}, "log": "nginx.log", "message": "INFO message hello", "node": "server01", "nested": [{"key1": "server01 nested value 1"}, {"key2": "server01 nested value 2"}]}' | kafkactl send temp-topic-test --partition 0
echo '{"datacenter": "chicago", "env": "dev", "host": {"name": "server02.example.com"}, "log": "apache.log", "message": "INFO message hello", "node": "server02", "nested": [{"key1": "server02 nested value 1"}, {"key2": "server02 nested value 2"}]}' | kafkactl send temp-topic-test --partition 0
echo '{"datacenter": "chicago", "env": "dev", "host": {"name": "server02.example.com"}, "log": "apache.log", "message": "WARN message bye", "node": "server02", "nested": [{"key1": "server02 another nested value 1"}, {"key2": "server02 another nested value 2"}]}' | kafkactl send temp-topic-test --partition 0
echo '{"datacenter": "new york", "env": "dev", "host": {"name": "server07.example.com"}, "log": "tomcat.log", "message": "WARN message bye", "node": "server07", "nested": [{"key1": "server07 nested value 1"}, {"key2": "server07 nested value 2"}]}' | kafkactl send temp-topic-test --partition 0
echo '{"datacenter": "miami", "env": "prod", "host": {"name": "server05.example.com"}, "log": "nginx.log", "message": "CRIT message oh no!", "node": "server05", "nested": [{"key1": "server05 nested value 1"}, {"key2": "server05 nested value 2"}]}' | kafkactl send temp-topic-test --partition 0

  • Verify the messages were delivered successfully:
> kafkactl logs --follow --tail 5 temp-topic-test --no-header
{"datacenter": "atlanta", "env": "prod", "host": {"name": "server01.example.com"}, "log": "nginx.log", "message": "INFO message hello", "node": "server01", "nested": [{"key1": "server01 nested value 1"}, {"key2": "server01 nested value 2"}]}
{"datacenter": "chicago", "env": "dev", "host": {"name": "server02.example.com"}, "log": "apache.log", "message": "INFO message hello", "node": "server02", "nested": [{"key1": "server02 nested value 1"}, {"key2": "server02 nested value 2"}]}
{"datacenter": "chicago", "env": "dev", "host": {"name": "server02.example.com"}, "log": "apache.log", "message": "WARN message bye", "node": "server02", "nested": [{"key1": "server02 another nested value 1"}, {"key2": "server02 another nested value 2"}]}
{"datacenter": "new york", "env": "dev", "host": {"name": "server07.example.com"}, "log": "tomcat.log", "message": "WARN message bye", "node": "server07", "nested": [{"key1": "server07 nested value 1"}, {"key2": "server07 nested value 2"}]}
{"datacenter": "miami", "env": "prod", "host": {"name": "server05.example.com"}, "log": "nginx.log", "message": "CRIT message oh no!", "node": "server05", "nested": [{"key1": "server05 nested value 1"}, {"key2": "server05 nested value 2"}]}

  • Extract Fields:
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'datacenter'
[atlanta]

[chicago]

[chicago]

[new york]

[miami]



> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'datacenter,node,message'
[atlanta server01 INFO message hello]

[chicago server02 INFO message hello]

[chicago server02 WARN message bye]

[new york server07 WARN message bye]

[miami server05 CRIT message oh no!]


  • Extracting Nested Fields:
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'host.name'
[server01.example.com]

[server02.example.com]

[server02.example.com]

[server07.example.com]

[server05.example.com]


> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'datacenter,nested'
[atlanta [{"key1": "server01 nested value 1"}, {"key2": "server01 nested value 2"}]]

[chicago [{"key1": "server02 nested value 1"}, {"key2": "server02 nested value 2"}]]

[chicago [{"key1": "server02 another nested value 1"}, {"key2": "server02 another nested value 2"}]]

[new york [{"key1": "server07 nested value 1"}, {"key2": "server07 nested value 2"}]]

[miami [{"key1": "server05 nested value 1"}, {"key2": "server05 nested value 2"}]]


> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'datacenter,nested.0.key1'
[atlanta server01 nested value 1]

[chicago server02 nested value 1]

[chicago server02 another nested value 1]

[new york server07 nested value 1]

[miami server05 nested value 1]


> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'datacenter,nested.#.key2'
[atlanta ["server01 nested value 2"]]

[chicago ["server02 nested value 2"]]

[chicago ["server02 another nested value 2"]]

[new york ["server07 nested value 2"]]

[miami ["server05 nested value 2"]]

  • Simple Querying:
## Find an Equal Match:
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter '..#[datacenter==miami].message'
[]

[]

[]

[]

[CRIT message oh no!]

## Can also redirect output if desired:
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter '..#[datacenter==miami].message' | grep CRIT
[CRIT message oh no!]


## Not Equal:
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter '..#[datacenter!=miami].message'
[INFO message hello]

[INFO message hello]

[WARN message bye]

[WARN message bye]

[]


## % (like):
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'nested.#[key1%"server02 another*"].key1'
[]

[]

[server02 another nested value 1]

[]

[]


## !% (not like):
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'nested.#[key1!%"server02 another*"].key1'
[server01 nested value 1]

[server02 nested value 1]

[]

[server07 nested value 1]

[server05 nested value 1]

  • Use Commas to combine Multiple Fields:
> kafkactl logs --follow --tail 5 temp-topic-test --json-filter 'host.name,..#[datacenter==miami].message,nested.#[key1%"server02*"]'
[server01.example.com  ]

[server02.example.com  {"key1": "server02 nested value 1"}]

[server02.example.com  {"key1": "server02 another nested value 1"}]

[server07.example.com  ]

[server05.example.com CRIT message oh no! ]

Clone this wiki locally