Streaming Engines

In stream data processing, unbounded streams must be continuously processed with high efficiency. DolphinDB provides a number of stream computing engines where incremental computing is adopted for optimal performance. The results can be output to shared in-memory tables, stream tables, message-oriented middleware, databases and API clients for further processing.

Streaming Engines

Time-series Streaming Engines

DolphinDB provides 3 time-series streaming engines: time-series engine (createTimeSeriesEngine), daily time-series engine (createDailyTimeSeriesEngine) and session window engine (createSessionWindowEngine). The engines adopt time-based windows for calculation.

Both the time-series engine and the daily time-series engine can be used for sliding-window aggregations at specified frequencies, such as calculating OHLC bars.

The daily time-series engine extends the functionalities of the time-series engine by allowing users to specify trading sessions. For each session within a calendar day, the records before a session that have not participated in calculation can be included in the first window of the session for calculation.

In markets such as crypto market and forex market that operate without interruption, time-series engine fits the needs. However, in scenarios such as stock and futures market where there are well-defined trading sessions, the daily time-series engine is often more applicable.

Related: DolphinDB Tutorial: Time-Series Streaming Engine

Session Window Engine (createSessionWindowEngine)

Session windows can be considered as activity sessions (where data is generated). Before and after a session, there are gaps of inactivity (where no data are generated).

The session window engine has the same calculation rules and triggering patterns as the time-series engine. The difference is that the windows of the time-series engine are generated at fixed frequencies with a fixed size whereas the windows of the session window engine are not. The start time of the first session window is the timestamp of the first record ingested to the session window engine. If the session window does not receive another record for a certain period of time, it closes and the end time takes the timestamp of the last received record + the waiting time. After the current window ends, the next session window starts at the ingestion of a new record.

Taking the IoT scenario as an example: depending on whether a device is online or not, in certain time periods a large amount of data could be generated while in others there could be none. Applying sliding window calculation on such data may cause unnecessary computational overhead as a lot of empty windows will be generated. The session window engine is designed to solve such problems.

Reactive State Streaming Engine (createReactiveStateEngine)

Stream computation in DolphinDB can use stateless or stateful factors. A stateless factor takes only the latest record for calculation, while a stateful factor requires the latest record as well as states (i.e., previous records and intermediate results). States are continuously updated with each record or event and maintained in the streaming engines for subsequent calculation.

Each record ingested into the reactive state streaming engine (createReactiveStateEngine) triggers an output, so the numbers of inputs and outputs always stay the same. Only the vectorized functions can be used as operators in the engine. The performance of stateful operators (such as moving functions, cumulative functions, order-sensitive functions, and top-N functions) is optimized for their application scenarios in DolphinDB. Note that only these optimized state functions can be used in the engine. Alternatively, you can implement a stateful indicator by defining a user-defined function and declaring it with keyword @state before the definition.

Application Scenarios:

  • Finance: Calculating stateful factors with high-frequency trading data;

  • IoT: Detecting whether the real-time readings of temperature sensors are continuously increasing.

Cross-Sectional Streaming Engine (createCrossSectionalEngine)

The cross-sectional streaming engine (createCrossSectionalEngine) is used for real-time computing on cross-sectional data, which is a collection of observations (behaviors) for multiple subjects (entities such as different stocks) at a single point in time.

Anomaly Detection Streaming Engine (createAnomalyDetectionEngine)

The anomaly detection streaming engine (createAnomalyDetectionEngine) is used to detect anomalies by analyzing metric values. It outputs the records that satisfy the anomaly conditions.

Application Scenarios:

  • IoT: Monitoring device or power status;

  • Financial risk management: Filtering orders, monitoring trading volume, or setting overload alerts.

Joining

Standard SQL joins are designed for combining tables with historical data. However, to join streams with a standard SQL join statement, first you need to save the streams as snapshots at fixed intervals, process each snapshot independently, then combine the results. Writing such a query is cumbersome and it cannot generate the result with the timeliness required by real-time computing.

Instead, DolphinDB provides the following engines for joining stream tables.

Asof Join Engine (createAsofJoinEngine)

The asof join engine is typically used to work with time series data. In some cases, the left and right tables to be joined have high-precision timestamps that do not match exactly. For each record in the left table, the asof join engine matches it with the most recent record satisfying the matching condition (e.g., with same stock symbol) from the right table.

For example, when processing the US stock market data, you can join the trades and quotes data using the asof join engine to calculate the transaction costs.

Equi Join Engine (createEquiJoinEngine)

The equi join engine combines two streams based on equivalent values in specified columns. For example, use this engine to join the 1-minute market snapshot with trade data captured at one-minute interval to get all essential indicators in one table for further analysis on trading strategy.

Window Join Engine (createWindowJoinEngine)

For each record in the left table, rather than joining it with a single value from the right table, the engine computes on a specified window in the right table.

For example, we usually estimate the transaction cost of individual stock by joining the trades and quotes data. Using the window join engine, we can get a more reasonable estimation of transaction cost by calculating the average or median of quotes over a specified window around each trade, instead of using a single quote.

Left Semi Join Engine(createLeftSemiJoinEngine)

The left semi join engine joins each record in the left table with a matching record in the right table. Unlike the lookup join engine, there will be no return until a match is found. For example, you can use the engine to join each trade with the corresponding order; or join the snapshot of individual stocks with the snapshot of an index for correlation calculation.

The four types of join engines introduced so far are used to join two stream tables. To join a stream table and a table of historical data (in-memory table and dimension table are supported), use the lookup join engine.

Lookup Join Engine (createLookupJoinEngine)

The lookup join engine enriches a stream table with the data queried from another table (usually a dimension table) through left join. For example, you can use the lookup join engine to join the latest snapshot of a stock and intraday trading indicators from the previous day to gain insights into real time trading.

Further Reading

In some cases, the metrics you provide need to be calculated on different dimensions through pipeline processing of multiple engines. You need to manually distinguish which part is, for example, a cross-sectional operation and which part is a time series operation, and implement the metrics with the appropriate engines. By using an engine as the output of another engine, we can build an engine pipeline.

Note: Starting from version 1.30.17/2.00.5, you can ingest the output of a non-join engine into a join engine through the built-in functions getLeftStream and getRightStream to use the engines in series.