Link Search Menu Expand Document

Complex Event Processing

Table of contents

  1. Overview
  2. Pattern definitions
    1. Timed patterns
    2. Windowed patterns
  3. Pattern matching
  4. CEP for Java SDK

Overview

Complex event processing (CEP) is an approach to predict high-level events likely to result from specific sets of low-level factors. CEP identifies and analyzes cause-and-effect relationships among events in real-time, allowing proactively take effective actions in response to specific scenarios.

Speaking of Kio, the CEP module allows you to describe a pattern as a sequence of events and conditions for them and apply this pattern to the input collection, receiving a collection of complex events collected according to the defined pattern as the output.

Pattern definitions

There are the following methods to define a pattern:

Method Description
startWith(namecondition) Static method of the Pattern class to defines a beginning of a pattern.

Arguments:
· name: String - the name of the pattern element
· condition: (T) -> Boolean - a condition to check the element
then(namecondition) Appends a new pattern element with the defined name, which matches an event from the input collection that should directly succeed to the previous matched event (strict contiguity).

Arguments:
· name: String - the name of the pattern element
· condition: (T) -> Boolean - a condition to check the element
thenFollowBy(namecondition) Appends a new pattern element with the defined name. Other events not matched to the condition can occur between a matched event and the previous matched event (relaxed contiguity).

Arguments:
· name: String - the name of the pattern element
· condition: (T) -> Boolean - a condition to check the element
thenFollowByAny(namecondition) Appends a new pattern element with the defined name. Other events can occur between a matched event and the previous matched event, and all alternative complex events will be presented for every alternative matched event (non-deterministic relaxed contiguity).

Arguments:
· name: String - the name of the pattern element
· condition: (T) -> Boolean - a condition to check the element

Here is an example of a pattern definition and applying:

// pattern definition
val pattern: Pattern<Event> = Pattern
    .startWith<Event>("start") { it.name == "start" }
    .then("middle") { it.name == "middle" && it.value > 5.5 }
    .thenFollowByAny("end") { it.name == "end" }
    .within(Duration.standardSeconds(10))

// applying to the input
val complexCollection: PCollection<ComplexEvent<Event>> =
    input.match(pattern, allowedLateness = Duration.standardSeconds(30))

// mapping the complex events
complexCollection.map {
    val start = it["start"].elementAt(0)
    val middle = it["middle"].elementAt(0)
    val end = it["end"].elementAt(0)
    "${start.id} -> ${middle.id} -> ${end.id}"
}

Timed patterns

Timed patterns have the maximum time interval for an event sequence to match. If a non-completed event sequence exceeds this time, it is discarded. To define the timed pattern, it’s needed to use within(duration) method.

Windowed patterns

Windowed patterns don’t have any limitations by time but all events must be in the same window to be matched. There is the withinWindow() method for that.

Pattern matching

There are the following methods to apply the defined pattern to the input collection:

Method Description
PCollection<T>
.match(patternallowedLateness)
↪️ PCollection<ComplexEvent<T>>
Applies the defined pattern to the input collection and returns a new collection of complex events.

Arguments:
· pattern: Pattern<T> - a pattern to apply
· allowedLateness: Duration - optional value determining the duration of a possible lag of events
PCollection<KV<K, V>>
.matchValues(patternallowedLateness)
↪️ PCollection<ComplexEvent<T>>
Applies the defined pattern to the values of the input collection and returns a new collection of complex events for each key.

Arguments:
· pattern: Pattern<V> - a pattern to apply
· allowedLateness: Duration - optional value determining the duration of a possible lag of events

CEP for Java SDK

There is the CEP object witch has the following methods to allow using pattern matching via pure Beam’s Java SDK:

matchValues(pattern)
↪️ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ComplexEvent<T>>>>

Returns a transformation to apply the defined pattern to values from the input collection and transform it into a new collection of complex events.

Arguments:
· pattern: Pattern<V> - a pattern to apply
lateMatchValues(patternallowedLatenesskeyClassvalueClass)
↪️ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ComplexEvent<T>>>>

Returns a transformation to apply the defined pattern to values from the input collection with a specified duration waiting for late events and transform it into a new collection of complex events.

Arguments:
· pattern: Pattern<T> - a pattern to apply
· allowedLateness: Duration - a value determining the duration of a possible lag of events
· keyClass: Class<K> - the class of keys in the input collection
· valueClass: Class<V> - the class of values in the input collection