Processors

Decanter Processors are optional. They receive data from the collectors, apply a processing logic on the received event, and send a new event to the appenders.

The processors are listening for incoming events on decanter/collect/ dispatcher topics and send processed events to decanter/process/ dispatcher topics. By default, the appenders are listening on decanter/collect/ topics. If you want to append processed events, you have to configure the appenders to listen on decanter/process/ topics. To do that, you just have to change appender configuration with:

event.topics=decanter/process/*

It’s possible to "chain" processors thanks to the topics. For instance, you can have the first processor listening on decanter/collect/* topic (containing events coming from the collectors), and sending processed events to decanter/process/first. Then, a second processor can listen on decanter/process/first topic and send processed data to decanter/process/second. Finally, at the end of the chain, you have to configure the appenders to listen on decanter/process/second.

Pass Through

This processor doesn’t implement any concrete logic. It’s for the example how to implement a processor.

You can install this processor using the decanter-processor-passthrough feature:

karaf@root()> feature:install decanter-processor-passthrough

Aggregate

This processor "merges" several incoming events in a single one that is sent periodically.

You can install this processor using the decanter-processor-aggregate feature:

karaf@root()> feature:install decanter-processor-aggregate

By default, the "merged" event is sent every minute. You can change this using the period configuration.

You can provision etc/org.apache.karaf.decanter.processor.aggregate.cfg configuration file with:

period=120 # this is the period in seconds
target.topics=decanter/process/aggregate # that's the default target topic

You can also decide if a known property is overwritten in the aggregator or appended.

By default, properties are not overwritten, meaning that it’s prefixed by the event index in the aggregator:

0.foo=first
0.other=bar
1.foo=second
1.other=bar

In the processor etc/org.apache.karaf.decanter.processor.aggregate.cfg configuration file, you can enable overwrite:

overwrite=true

Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.

GroupBy

This processor "groups" events containing same properties values during a period.

For instance, you configure the GroupBy processor to group events using foo and bar properties. Then you receive the following three events:

  1. first event containing: { "foo":"foo","bar":"bar","first":"value1" }

  2. second event containing: { "hello":"world","second":"value2" }

  3. third event containing: { "foo":"foo","bar":"bar","third":"value3"}

The groupBy processor will create (and send) one event containing:

  • if you choose to "flatten" the properties, the event will contain: { "foo":"foo", "bar":"bar", "first":"value1","third":"value3" }

  • if you chosse not to "flatten" the properties, the event will contain: { "events":[ { "foo":"foo","bar":"bar","first":"value1" }, { "foo":"foo","bar":"bar","third":"value3" } ] }

You can install this processor using the decanter-processor-groupby feature:

karaf@root()> feature:install decanter-processor-groupby

By default, the "merged" event is sent every minute. You can change this using the period configuration.

The GroupBy processor is configured via etc/org.apache.karaf.decanter.processor.groupby.cfg configuration file:

#
# Decanter GroupBy processor
#target.topics=decanter/process/groupby

#
# Aggregation period in seconds
#
#period=60

#
# List of grouping properties
#
#groupBy=first,second

#
# If true, grouped events properties are flatten (all properties in the event) aka Map<String,Object>
# If false, grouped events properties are inner grouped map aka Map<int, Map<String,Object>>
#
#flatten=true
  • The target.topics property defines the list of Decanter topics (separated by ,) where the resulting events will be sent.

  • The period property defines the retention period to accumulate the incoming events

  • The groupBy property defines the property names (separated by ,) as grouping term

  • The flatten property defines the way the resulting event will be created. If true, all events properties will be store directly (flat) in the resulting event. If false, the resulting event will contain an array of properties (from the original grouped events).

Apache Camel

It’s also possible you implement your own event processor using Apache Camel.

Decanter Camel Processor delegates event processing to your Camel route. Your route just has to callback Decanter (on a dedicated Camel endpoint) to send the processed event back in the dispatcher.

By default, Decanter Camel processor send the events to direct-vm:decanter-delegate endpoint, and expects the processed event back on direct-vm:decanter-callback.

The Camel message body is Map<String,Object> (it’s what Decanter is sending into your Camel route and expects on the callback endpoint).

You can install the Camel processor with the decanter-processor-camel feature:

karaf@root()> feature:install decanter-processor-camel

This feature also installs etc/org.apache.karaf.decanter.processor.camel.cfg configuration file:

#
# Decanter Camel processor
#

#
# Destination dispatcher topics where to send the aggregated events
#
#target.topics=decanter/process/camel

#
# This is the Camel endpoint URI where Decanter is sending the events
# (using event Map<String, Object> as body)
#
#delegate.uri=direct-vm:decanter-delegate

#
# This is the Camel endpoint URI where user Camel route should call to be "back" in Decanter
# The user Camel route is supposed to do "to uri=[CALLBACK]" with a Map<String, Object> body
# resulting of the route processing.
# Decanter uses this body to send a new Event to the dispatcher target topics.
#
#callback.uri=direct-vm:decanter-callback
  • the target.topics property is the list of Decanter dispatcher topics (separated by ,) where the processor will "forward" the processed events.

  • the delegate.uri property is the Camel endpoint URI where Decanter Camel Processor will send events (as Map<String,Object>). It’s basically the from endpoint of your route.

  • the callback.uri property is the Camel endpoint URI where Decanter Camel Processor is waiting from your processed events. Basically, it’s where your route should send processed events (to of your route).