Processors
Decanter Processors are optional. They receive data from the collectors, apply a processing logic on the received event, and send a new event to the appenders.
The processors are listening for incoming events on decanter/collect/
dispatcher topics and send processed events to decanter/process/
dispatcher topics.
By default, the appenders are listening on decanter/collect/
topics. If you want to append processed events, you have to configure the appenders
to listen on decanter/process/
topics. To do that, you just have to change appender configuration with:
event.topics=decanter/process/*
It’s possible to "chain" processors thanks to the topics.
For instance, you can have the first processor listening on decanter/collect/*
topic (containing events coming from the collectors), and
sending processed events to decanter/process/first
. Then, a second processor can listen on decanter/process/first
topic and send processed
data to decanter/process/second
. Finally, at the end of the chain, you have to configure the appenders to listen on
decanter/process/second
.
Pass Through
This processor doesn’t implement any concrete logic. It’s for the example how to implement a processor.
You can install this processor using the decanter-processor-passthrough
feature:
karaf@root()> feature:install decanter-processor-passthrough
Aggregate
This processor "merges" several incoming events in a single one that is sent periodically.
You can install this processor using the decanter-processor-aggregate
feature:
karaf@root()> feature:install decanter-processor-aggregate
By default, the "merged" event is sent every minute. You can change this using the period
configuration.
You can provision etc/org.apache.karaf.decanter.processor.aggregate.cfg
configuration file with:
period=120 # this is the period in seconds target.topics=decanter/process/aggregate # that's the default target topic
You can also decide if a known property is overwritten in the aggregator or appended.
By default, properties are not overwritten, meaning that it’s prefixed by the event index in the aggregator:
0.foo=first 0.other=bar 1.foo=second 1.other=bar
In the processor etc/org.apache.karaf.decanter.processor.aggregate.cfg
configuration file, you can enable overwrite
:
overwrite=true
Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.
GroupBy
This processor "groups" events containing same properties values during a period.
For instance, you configure the GroupBy processor to group events using foo
and bar
properties. Then you receive
the following three events:
-
first event containing:
{ "foo":"foo","bar":"bar","first":"value1" }
-
second event containing:
{ "hello":"world","second":"value2" }
-
third event containing:
{ "foo":"foo","bar":"bar","third":"value3"}
The groupBy processor will create (and send) one event containing:
-
if you choose to "flatten" the properties, the event will contain:
{ "foo":"foo", "bar":"bar", "first":"value1","third":"value3" }
-
if you chosse not to "flatten" the properties, the event will contain:
{ "events":[ { "foo":"foo","bar":"bar","first":"value1" }, { "foo":"foo","bar":"bar","third":"value3" } ] }
You can install this processor using the decanter-processor-groupby
feature:
karaf@root()> feature:install decanter-processor-groupby
By default, the "merged" event is sent every minute. You can change this using the period
configuration.
The GroupBy processor is configured via etc/org.apache.karaf.decanter.processor.groupby.cfg
configuration file:
# # Decanter GroupBy processor #target.topics=decanter/process/groupby # # Aggregation period in seconds # #period=60 # # List of grouping properties # #groupBy=first,second # # If true, grouped events properties are flatten (all properties in the event) aka Map<String,Object> # If false, grouped events properties are inner grouped map aka Map<int, Map<String,Object>> # #flatten=true
-
The
target.topics
property defines the list of Decanter topics (separated by,
) where the resulting events will be sent. -
The
period
property defines the retention period to accumulate the incoming events -
The
groupBy
property defines the property names (separated by,
) as grouping term -
The
flatten
property defines the way the resulting event will be created. Iftrue
, all events properties will be store directly (flat) in the resulting event. Iffalse
, the resulting event will contain an array of properties (from the original grouped events).
Apache Camel
It’s also possible you implement your own event processor using Apache Camel.
Decanter Camel Processor delegates event processing to your Camel route. Your route just has to callback Decanter (on a dedicated Camel endpoint) to send the processed event back in the dispatcher.
By default, Decanter Camel processor send the events to direct-vm:decanter-delegate
endpoint, and
expects the processed event back on direct-vm:decanter-callback
.
The Camel message body is Map<String,Object>
(it’s what Decanter is sending into your Camel route and expects on
the callback endpoint).
You can install the Camel processor with the decanter-processor-camel
feature:
karaf@root()> feature:install decanter-processor-camel
This feature also installs etc/org.apache.karaf.decanter.processor.camel.cfg
configuration file:
# # Decanter Camel processor # # # Destination dispatcher topics where to send the aggregated events # #target.topics=decanter/process/camel # # This is the Camel endpoint URI where Decanter is sending the events # (using event Map<String, Object> as body) # #delegate.uri=direct-vm:decanter-delegate # # This is the Camel endpoint URI where user Camel route should call to be "back" in Decanter # The user Camel route is supposed to do "to uri=[CALLBACK]" with a Map<String, Object> body # resulting of the route processing. # Decanter uses this body to send a new Event to the dispatcher target topics. # #callback.uri=direct-vm:decanter-callback
-
the
target.topics
property is the list of Decanter dispatcher topics (separated by,
) where the processor will "forward" the processed events. -
the
delegate.uri
property is the Camel endpoint URI where Decanter Camel Processor will send events (asMap<String,Object>
). It’s basically thefrom
endpoint of your route. -
the
callback.uri
property is the Camel endpoint URI where Decanter Camel Processor is waiting from your processed events. Basically, it’s where your route should send processed events (to
of your route).