Complex Event Processing
The Key Concepts of Complex Event Processing
Multi-chapter guide | Chapter 1
Complex event processing applies advanced techniques to real-time data processing. It aggregates and analyzes data from multiple sources and takes action in real-time, enabling businesses to benefit from new insights immediately.
Even processing emerged as a solution to real-time data integration. For decades, organizations integrated data in batches until LinkedIn open-sourced Apache Kafka in 2011, enabling low-latency processing of large data volumes. Soon after the arrival of Apache Kafka, Twitter open-sourced Apache Storm, which became one of the first widely used event stream processing frameworks. It included many core concepts that modern frameworks have today but lacked critical features like state management.
As stream processing became more popular, data ingestion and processing demands increased. Complex event processing techniques became necessary to detect advanced patterns in event sequences over multiple sources. While frameworks like Apache Samza made some improvements, those features didn’t mature until recently.
Today, modern frameworks like Apache Flink, Apache Spark, and DataForge provide solutions for complex event processing without sacrificing critical features like scalability, fault tolerance, and observability. These frameworks offer greater visibility, easier parsing of structurally complex event data, and stateful operations to implement complex processing and transformation logic.
This article explores complex event processing and discusses how modern data frameworks provide performance at scale.
Summary of key complex event processing concepts
Concept | Description |
---|---|
Complex vs. non-complex events | Utilizing structurally complex event data safely and consistently requires strong schema and good tooling. |
Stream processing | Low-latency processing of a continuous event sequence for real-time analysis. |
Stateless vs. stateful operations | Streaming operations and how they use state to enable significantly more complex processing logic. |
Watermarking | Defines a threshold for late-arriving events so that stateful operations know when to finalize their processing and clean up their state. |
Detecting patterns | Use state to detect complex patterns from event sequences in real-time. |
Correlating multiple events | Connect different event types from multiple data streams with stateful operations. |
Scalability | Support growing data volumes without performance degradation or significantly changing your processing logic. |
Fault-tolerance | Ensure data safety and recover quickly from issues like hardware failures. |
Delivery guarantees | A set of standard delivery guarantees helps developers determine pipeline behavior under failure scenarios. |
Observability | Full visibility into system processes and state is paramount to guarantee smooth operation. |
Complex vs. non-complex events
Events are often related to important business events and contain data from various sources, such as API responses, application logs, and telemetry. For example, consider an event that tracks user purchases across different store websites. This data could be used in a data pipeline in many ways, such as
Aggregating popular items for future recommendations
Monitoring site activity to detect buggy releases
Identifying fraudulent purchases, etc.
For example, the event below is a collection of simple string value fields. It contains identifiers for the user making the purchase, the item purchased, and the store website where it was purchased.
Event data is mostly represented as semi-structured JSON. Adding more information to the purchase event introduces complexity. Consider the following scenarios.
Nested schemas
Events can often have multiple schemas nested within a main parent schema, with nesting continuing for multiple layers. Below is an example of our purchase event, in which item, price, and store information now use nested schemas.
{
"eventName": "purchase",
"eventTimestamp": "2019-09-05T11:28:00.000Z",
"eventId": "8015b946",
"userId": "c006138a",
"item": {
"itemId": "4c1b70d3",
"quantity": 1,
"price": {
"amount": 5,
"currencyCode": "USD"
}
},
"store": {
"storeId": "663fb949",
"version": "1.3.422",
"page": "/sports/baseball/2"
}
}
Nested arrays
Events can also contain nested arrays containing schema elements. Below is another example of our purchase event, which contains item arrays to track multiple purchases.
{
"eventName": "purchase",
"eventTimestamp": "2019-09-05T11:28:00.000Z",
"eventId": "8015b946",
"userId": "c006138a",
"items": [
{
"itemId": "4c1b70d3",
"quantity": 1,
"price": {
"amount": 5,
"currencyCode": "USD"
}
},
{
"itemId": "3bba7aba",
"quantity": 1,
"price": {
"amount": 9,
"currencyCode": "USD"
}
}
]
}
Variable schemas
Parts of an event’s schema may constantly change, evolve, or simply be unknown. For example, consider our purchase event, which tracks purchases from a constantly changing store list. The store schema in our event could change significantly depending on the store type and provider.
{
"eventName": "purchase",
"eventId": "8015b946",
...
"store": {
"storeId": "663fb949",
"version": "1.3.422",
"page": "/sports/baseball/2"
}
}
{
"eventName": "purchase",
"eventId": "1a42ccb4",
...
"store": {
"name": "Sporting Goods Store",
"cartId": 32544,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
}
}
Handling structural complexity
Nested schemas and arrays are often handled by defining schemas with complex types that enable modern frameworks to parse these events. In Spark and DataForge, the struct and array types are used, while in Flink, the row and array types are used.
Variable schemas have historically been more difficult to handle, forcing teams to choose between flexibility and performance/safety. To maximize flexibility, teams store JSON data as strings. Those wanting performance must maintain strict schemas that often have to be changed, resulting in occasional data loss. The variant data type, available in Spark 4.0, offers a solution.
Once you have a strong set of schemas defined for your event data, modern frameworks provide tools to safely query and process the data in a highly performant manner. For example, the screenshot below shows how DataForge can capture your schemas and give you an easy way to query complex structures.
DataForge Cloud 8.0 also allows you to handle nested semi-structured data (such as ARRAY<STRUCT<..>>) easily by introducing sub-sources. Sub-sources can be treated as regular data sources; however, they eliminate the need for developers to build normalization pipelines to standardize complex formats of structured data, by inheriting mapping context from parent sources. Sub-sources can be read and transformed in the same way that regular sources are.
Stateless vs. stateful operations
Modern frameworks support state operations critical to complex event processing.
Stateless operations
Stateless operations do not store or aggregate information between events. Each input is processed independently of all other inputs. For example, a stateless map operation applies a user-defined function to each input value and outputs the result. Similarly, a filter operation compares each input with a specific condition and only outputs values that meet that condition.
Stateful operations
Stateful operations retain state or data over time. For example, a window operation enables us to group inputs by time and run other operations over those groups. It can group input events by the minute they were received and then apply count aggregation over the group to get the final result.
Inputs in this operation are no longer processed independently. For each window group, you store and aggregate state (in this example, our event counts) across multiple inputs.
Stateless operations process each input independently, while stateful operations store and aggregate state across multiple inputs.
Watermarking
When processing events by their event time, there is no guarantee that events will arrive in order. Late-arriving events pose a challenge. For example, the windowing operation introduced in the previous section must know when each time window completes to finalize its processing and state clean up. Otherwise, state volume increases and causes performance bottlenecks.
A watermark is a threshold for late-arriving events. It tells stateful operations how long they must wait before finalizing and cleaning up their state. Below is an example of setting a watermark in Apache Spark Structured Streaming:
This example groups events into 10-minute time windows. The watermark is set to 5 minutes, meaning that the windowing operation waits until it sees an event timestamp greater than a window’s ending timestamp + 5 minutes before finalizing that window. If an event arrives later than 5 minutes, it will be processed in a subsequent window
Detecting patterns
Detecting patterns in event sequences is one of the main techniques in complex event processing. Let’s see how stateful operations can help us detect patterns by looking at a simple example.
The example code below uses Apache Spark Structured Streaming to detect over 100 error events in a 2-minute sliding window.
# filter down to only error events
errorEvents = events.filter("eventName == 'error'")
# count error events in 2-minute windows (with a 1-minute slide)
errorCounts = errorEvents \
.withWatermark("eventTimestamp", "1 minute") \
.groupBy(
window("eventTimestamp", "2 minutes", "1 minute"),
"eventName") \
.count()
# filter down to counts that are over 100
largeCounts = errorCounts.filter("count > 100")
The windowing operation groups error events and aggregates their counts. The output error counts are filtered by a threshold set to 100.
When counts cross the threshold, your system can notify the engineering team of an abnormal error rate within minutes of the errors occurring. This is the real power of complex event processing—the ability to take action on something you have recognized in real time.
Correlating multiple events
Data you need to analyze can often be spread over multiple event types. Correlating different events is another important technique in complex event processing. Let’s look at another example to see how stateful operations can help here, too.
The example code below uses Apache Flink to join together, view, and purchase events from two different streams.
// stream of view events
DataStream viewEvents = ...
// stream of purchase events
DataStream purchaseEvents = ...
// join view and purchase event streams
DataStream joined = viewEvents
.join(purchaseEvents)
// join on view and purchase itemId fields
.where(viewEvent -> viewEvent.itemId)
.equalTo(purchaseEvent -> purchaseEvent.itemId)
// use a 5-minute window (with 1-minute slide) to
// search for eligible events to join
.window(
SlidingEventTimeWindows.of(
Duration.ofMinutes(5), Duration.ofMinutes(1)
)
)
// finally return joined events as a ViewPurchase object
.apply(ViewPurchase::new);
The join / where / equalTo operators join events from two separate event streams: view events and purchase events using their respective itemId fields. A stateful windowing operation defines the events eligible to be joined together. The join is completed with the apply operation that runs for each joined event pair.
Modern frameworks support stateful streaming joins, making it easy to correlate events across many different sources. Combining this with other techniques allows you to build significantly more complex event processing pipelines.
Scalability
Scalability is the ability of a system to remain performant as the data volume increases. Complex event processing systems employ several techniques to ensure the scalability of the entire system.
Data partitioning
Data is always separated into a configurable number of partitions. Data is mapped to a partition based on a user-defined key. If no key is present, data is assigned to a random partition to optimally balance data over the available partitions.
Parallel processing
Once data has been separated, processing occurs independently within each partition. Data partitions are distributed over a server cluster and processed in parallel. As data volume increases, more partitions and servers can be added to handle the load.
Keyed state
State storage in modern frameworks is a key-value store where the state's key matches the data partition key. This setup has two important advantages. First, the state can be partitioned and scaled in the same way as data processing. Secondly, it enables data locality, which means the state needed to process data can always be stored locally alongside that data. This significantly improves scalability by eliminating the need to make network calls to external state storage.
Fault-tolerance
Stream processing systems must be resilient to inevitable issues (like hardware failures) during operation. You want to guarantee data safety and recover quickly. Modern frameworks have several ways of providing such fault tolerance.
Data durability
Modern frameworks often use features of the data source itself to guarantee data safety. For example, when using an Apache Kafka source, your data is stored durably inside Kafka itself, so as long as the system can recover quickly, there should not be any risk of data loss. Kafka also allows you to rewind and replay data in a stream, guaranteeing that all data is processed even during failure recovery.
Checkpoints
Modern frameworks periodically perform checkpoints to recover quickly after failures, creating snapshots of the distributed data stream and operator state. In case of failure, the framework uses the latest available snapshot to quickly reinitialize your entire pipeline with the snapshotted state and rewind your source data stream to the correct position. Then, the entire system can continue processing data exactly where it left off.
Delivery guarantees
Streaming systems, in general, operate within one of several standard levels of delivery guarantees. These levels help us reason about the results we can receive from streaming pipelines under failure scenarios. The following sections cover the three standard levels.
At most once
Under certain failure scenarios, the results of your stream processing are lost at this level. For example, if a failure occurs too close to the next checkpoint, all data between the previous checkpoint and the point of failure is lost. Modern frameworks do not operate at this level because they always checkpoint stream positions and results (i.e., state) together.
At least once
In this case, the results of your stream processing may be delivered more than once under certain failure scenarios. This can often happen when a source stream is rewound and events are replayed. Modern frameworks combined with the right source/sinks (see below) can avoid duplicate results. However, if your application can tolerate duplicates, operating at this level and gaining significant performance improvements may be advantageous.
Exactly once
This is the ideal case where the results of your stream processing are delivered exactly once under all failure scenarios. Modern frameworks can operate at this level if combined with a source that supports replayability (like Apache Kafka) and a transactional or idempotent sink.
Observability
Constant monitoring of an event streaming system’s processes, state, and hardware is important for providing stable operations. Modern frameworks provide various metrics and methods for system introspection to help monitor overall health.
Apache Flink makes metrics available via both its web interface and REST API. It also exposes its metrics system to user-defined functions so developers can instrument their code with custom metrics. Apache Spark also provides metrics in its web interface and REST API, making it easy to export them to external monitoring systems like Prometheus. DataForge, in addition to the metrics provided by Apache Spark, also makes all of your compiled functional code, orchestration details, operational metrics, and other metadata available in easy-to-query tables.
Common pitfalls
Underutilizing complex types
When events become more structurally complex, it can be easy to become overly focused on quick wins and skip creating a strict event schema. This often manifests when using strings of JSON for fields with deeply nested schemas.
Using JSON strings degrades the performance of your event processing pipeline and can be a source of future bugs as you lose type safety when querying into those fields. Discoverability also suffers since frameworks will no longer be able to accurately display the content of those fields.
The best practice is to take the time upfront to define a strict schema for all your events, using complex types (ex, the struct and array types in Spark and DataForge) as necessary.
Bloated state size
The power of stateful operations can make them easy to overuse, and the settings to keep the state cleaned up are easy to forget. Unfortunately, large state sizes significantly affect the performance of your event processing pipeline. They also slow down the snapshotting process and make recovery from failures much harder.
Make sure stateful operations are used only when needed and the data structures used to store state are space-optimized. Watermarks should always be set up to maintain only the minimum necessary state size.
Unbalanced data partitioning
Data must be balanced across partitions to scale processing and handle large data volumes. Choosing good partition keys is the most important factor in ensuring optimal data partitioning. Lack of visibility into and changes in data distribution are common challenges when selecting a partition key. Make sure to perform distribution analysis on your data and set up data monitoring for imbalances in production.
Inadequate monitoring
Complex event processing pipelines have many moving parts that must be monitored to ensure stable operations. Without visibility into your pipeline, you cannot debug issues that arise in production. Issues also go undetected longer, giving them more time to cause serious damage.
Modern frameworks' observability features provide a large collection of metrics. Some basic metrics you should monitor in your pipelines are Kafka consumer lag, end-to-end pipeline latency, state snapshot successes/failures, state size, and data throughput.
Recommendations
Create a strong schema for input events to fully utilize modern frameworks’ safety guarantees and querying features.
Apply stream processing techniques to analyze and act on data in real time.
Utilize stateful streaming operations when implementing more complex logic that requires storing and aggregating information across multiple events.
Define appropriate watermarks so that stateful operations can finalize processing and cleanup state on time.
Ensure your data is partitioned and balanced to fully utilize modern frameworks’ scalable architectures.
Utilize checkpoints to recover from failures quickly.
Understand which standard level of delivery guarantees your system operates under to make reasoning about behavior during failure scenarios easier.
Make full use of modern frameworks’ built-in observability features to monitor the health of your system at all times.
Conclusion
Complex event processing emerged from businesses' need to process more structurally complex event data. Modern frameworks provide comprehensive solutions for these needs, specifically by allowing greater visibility and easier parsing of structurally complex event data. They also support various stateful operations without sacrificing other critical functionality, such as scalability, fault tolerance, and observability.