createReactiveStateEngine(name, metrics, dummyTable, outputTable, keyColumn, [filter], [snapshotDir], [snapshotIntervalInMsgCount], [keepOrder], [keyPurgeFilter], [keyPurgeFreqInSecond=0], [raftGroup])


name is a string indicating the name of the reactive state engine. It is the only identifier of the reactive state engine. It can have letter, number and “_”. It must start with a letter.

metrics is metacode specifying the formulas for calculation. The formulas should use vector functions instead of aggregate functions. For more information about metacode please refer to Metaprogramming. To use a user-defined function in the reactive state engine, please add “@state” to declare it before the definition.

The following state functions in DolphinDB’s reactive state engine has been optimized. Currently, unoptimized state functions are not supported by this engine. And aggregate functions should be avoid.

  • cumulative function: cumavg, cumsum, cumprod, cumcount, cummin, cummax, cumvar, cumvarp, cumstd, cumstdp, cumcorr, cumcovar, cumbeta, cumwsum, cumwavg, cumfirstNot, cumlastNot, cummed, cumpercentile

  • moving function: ema, mavg, msum, mcount, mprod, mvar, mvarp, mstd, mstdp, mskew, mkurtosis, mmin, mmax, mimin, mimax, mmed, mpercentile, mrank, mcorr, mcovar, mbeta, mwsum, mwavg, mmad, mfirst, mlast, mslr, tmove, tmfirst, tmlast, tmsum, tmavg, tmcount, tmvar, tmvarp, tmstd, tmstdp, tmprod, tmskew, tmkurtosis, tmmin, tmmax, tmmed, tmpercentile, tmrank, tmcovar, tmbeta, tmcorr, tmwavg, tmwsum, tmoving, moving, sma,wma, dema, tema, trima, linearTimeTrend, talib, t3, ma

  • order-sensitive function: deltas, ratios, ffill, move, prev, iterate, ewmMean, ewmVar, ewmStd, ewmCov, ewmCorr

  • topN function: msumTopN, mavgTopN, mstdpTopN, mstdTopN, mvarpTopN, mvarTopN, mcorrTopN, mbetaTopN, mcovarTopN, mwsumTopN

  • other: talibNull, talibNull, dynamicGroupCumsum, dynamicGroupCumcount

Note: The parameter func in the state function talib can only be a state function supported by the reactive state engine.

dummyTable is a table object whose schema must be the same as the subscribed stream table. The system uses the schema of dummyTable to determine the data type of each column in the messages. Whether dummyTable contains data does not matter.

outputTable is the output table for the results. It can be an in-memory table or a DFS table. We need to set the output table as an empty table and specify the names and data types of the columns before using function createReactiveStateEngine.

keyColumn is a string scalar indicating a column name. The calculation is conducted within each group of keyColumn.

filter is metacode indicating filtering conditions. Only the results that satisfy the filtering conditions are output to outputTable. It must be an expression. Connect them with logical operators (and, or) to set multiple conditions.

snapshotDir is a string indicating the directory where the streaming engine snapshot is saved. The snapshot is used to restore the streaming engine after system interruption.

  • The directory must already exist, otherwise an exception is thrown.

  • When creating a streaming engine, if snapshotDir is specified, the system will check whether there is a snapshot in the directory already. If it exists, the snapshot will be loaded to restore the state of the engine.

  • Multiple streaming engines can share a directory.

  • A snapshot may use 3 names. When the snapshot is being generated, the file name is <engineName>.tmp; after the snapshot is generated and flushed to the disk, it is renamed to <engineName>.snapshot; if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount is a positive integer indicating the number of messages between 2 streaming engine snapshots.

To enable snapshots in streaming engines, snapshotDir and snapshotIntervalInMsgCount must be specified.

keepOrder is an optional parameter indicating whether to sort the output table in order as input. If keepOrder = true, sort it in order as input. If keyColumn contains a time column, the default value is true. Otherwise, the default value is false.

raftGroup is the ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. It is an integer greater than 1. Specify raftGroup to enable high availability on the streaming engine. When a streaming engine is created on the leader, it is also created on each follower. Once a snapshot is generated, it will also be synced to all followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.


Create a reactive state engine to conduct realtime calculations with streaming data.

Return a table object that subscribes to a stream table. Writing data to this table means that data enters the reactive state engine for calculation.

Generally it is used in conjunction with subscribeTable. The engine it creates can be called across sessions.

The function was released in version 1.30.2.

Note: The reactive state engine doesn’t support multi-threaded concurrent writes. We must specify setExecutorPooling = false to make the write tasks execute serially.


$ def sum_diff(x, y){
$  return (x-y)/(x+y)
$ }
$ factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>
$ share streamTable(1:0, `sym`time`price, [STRING,DATETIME,DOUBLE]) as tickStream
$ result = table(1000:0, `sym`time`factor1, [STRING,DATETIME,DOUBLE])
$ rse = createReactiveStateEngine(name="reactiveDemo", metrics =[<time>, factor1], dummyTable=tickStream, outputTable=result, keyColumn="sym", filter=<sym in ["000001.SH", "000002.SH"]>)
$ subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})

$ data1 = table(take("000001.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 10+cumsum(rand(0.1, 100)-0.05)as price)
$ data2 = table(take("000002.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 20+cumsum(rand(0.2, 100)-0.1)as price)
$ data3 = table(take("000003.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 30+cumsum(rand(0.3, 100)-0.15)as price)
$ data = data1.unionAll(data2).unionAll(data3).sortBy!(`time)

$ replay(inputTables=data, outputTables=tickStream, timeColumn=`time)

The result only contains the stocks “000001.SH” and “000002.SH” that are specified in the filtering condition.

To rerun the script above, we need to first execute the following script:

$ unsubscribeTable(tableName=`tickStream, actionName="factors")
$ dropStreamEngine(`reactiveDemo)
$ undef(`tickStream, SHARED)

Calculate in groups by date and sym column. Then output the result which time is between “2012.01.01” and “2012.01.03”.

$ share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
$ outputTable = table(100:0, `date`sym`factor1, [DATE, STRING, DOUBLE])
$ engine = createReactiveStateEngine(name="test", metrics=<mavg(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn=["date","sym"], filter=<date between 2012.01.01 : 2012.01.03>, keepOrder=true)
$ subscribeTable(tableName=`trades, actionName="test", msgAsTable=true, handler=tableInsert{engine})

$ n=100
$ tmp = table(rand(2012.01.01..2012.01.10, n) as date, rand(09:00:00.000..15:59:59.999, n) as time, rand("A"+string(1..10), n) as sym, rand(['B', 'S'], n) as market, rand(100.0, n) as price, rand(1000..2000, n) as qty)
$ trades.append!(tmp)
$ select * from outputTable

// Execute the following code before re-run the above code.
$ unsubscribeTable(tableName=`trades, actionName="test")
$ dropStreamEngine(`test)
$ undef(`trades, SHARED)