Tuesday, January 31, 2012

Matching upward of 50k events per second close to real time?

We have a usecase on a R&D project to match a user defined query against an event stream that has about 50k transactions/second event rate. The usecase will use the results to make real time marketing recommendations. Following is a summary of the usecase.

  1. System gets about 50k messages/sec generated from transaction servers. 
  2. Each message contains name value properties
  3. Goal is to run user defined long running temporal queries on that data. For example, if a user has sent more than 100 SMS messages in the last month, give him a promotion. Marketing specialists provide those queries through a UI Wizard that deploys the query in to the system. (Queries are limited, they basically match properties or perform aggregations over a given time window)
  4. Goal is to match queries against events and respond with promotions within 5s of receiving the last event that completes the condition. 

There are many ways we can try to answer the problem.
  1. SQL - traditional answer is dump the data in to a RDBMS and periodically query (say every 3sec) the database and send promotions for all the matches. However, it is likely that the database could not keep up either writing that much of data or querying them. 
  2. CEP - Complex Event Processing is the ideal answer for the problem. They could handle events on this scale and support user defined queries. However, current CEP engines does not support persistance, and if the server fails while running a query with 1 month window, there is no way to recover the state. 
  3. Hadoop - Query is a easy one for Hadoop, but it works on batch processing mode and likely to take more than 5s. 
  4. Stream processing - Stream processing could scale to this usecase, but handling of temporal queries is unclear. 
  5. Cassandra - idea of building indexes while writing data to Cassandra might work. The we can periodically query and send promotions for matching users. However, cassandra is bad on incrementing a value due to its idempotent nature, and that will be a problem with the parallel writes. 
Following is the answer we are thinking about. It uses a combination of Cassandra + Zookeeper + Caching. 

First, users define queries they need to monitor. Those queries are either simple conditions (which is easy to implement) or aggregations done over a period of time. The the Query complier generates a Matcher to detect the query condition and deploys the code in processing servers. 

Each processing sever processes incoming events using matcher and updates in memory cache about detected conditions (e.g. count of aggregation values). There is a thread that periodically locks the cassandra and update the values stored in cassandra to also include the new values. Processing servers will use Zookeeper to take a lock before updating cassandra. 

There is also a thread that periodically (once a day) runs and update the counts maintained by cassandra to reflect the time windows by removing the  elapsed day's count from cassandra count. 

Finally, a promotion server periodically search indexes and send promotions for matching users. 

No comments: