createReactiveStateEngine

Syntax

createReactiveStateEngine(name, metrics, dummyTable, outputTable, keyColumn, [filter], [snapshotDir], [snapshotIntervalInMsgCount], [keepOrder], [keyPurgeFilter], [keyPurgeFreqInSecond=0], [raftGroup])

Details

This function creates a reactive state engine and returns a table object. Writing to the table means that data is ingested into the reactive state engine for calculation.

The following state functions are optimized in DolphinDB reactive state engine. Currently, unoptimized state functions are not supported by this engine. Aggregate functions should be avoided.

Note: If function talib is used as a state function, the first parameter func must be a state function.

For more application scenarios, see Streaming Engines.

Calculation Rules

The reactive state engine outputs a result for each input. If multiple records are ingested into the reactive state engine at the same time, the data is calculated in batches. The number of records in each batch is determined by the system.

  • To output only the results that met the specified conditions, set the parameter filter;

  • To perform calculations by group, set the parameter keyColumn;

  • To preserve the insertion order of the records in the output table, set the parameter keepOrder.

Features

  • Data/state cleanup: States in the engine are maintained by group. A large number of groups may lead to high memory overhead, and you can set a cleanup rule to clear data that are no longer needed. (See parameters keyPurgeFilter and keyPurgeFreInSecond)

  • Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshotDir and snapshotIntervalInMsgCount)

  • High availability of streaming engines: To enable high availability of streaming engines, specify the parameter raftGroup on the leader node of the raft group on the subscriber. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table.

Arguments

name is a string of the engine name. It is the only identifier of a reactive state engine on a data/compute node. It can have letter, number and “_” and must start with a letter.

metrics is metacode specifying the formulas for calculation. The formulas should use vector functions instead of aggregate functions. For more information about metacode refer to Metaprogramming. To use a user-defined function in the reactive state engine,

(1) add “@state” to declare the function before the definition. For state functions, only assignment statements and return statements are supported. Starting from 1.30.21/2.00.9, you can also use the if...else statement with scalar expressions.

(2) if the rvalue of an assignment statement is a function that returns multiple values, the values must be assigned to variables at the same time. In the following example, the user-defined state function references linearTimeTrend, which returns two values.

$ @state
$ def forcast2(S, N){
$       linearregIntercept, linearregSlope = linearTimeTrend(S, N)
$       return (N - 1) * linearregSlope + linearregIntercept
$ }

Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.

outputTable is the output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.

The output columns are in the following order:

(1) The first few columns must be in the same order as that of keyColumn.

(2) If the outputElapsedMicroseconds is set to true, specify two more columns: a LONG column and an INT column. See the outputElapsedMicroseconds parameter for details.

(3) Then followed by one or more result columns.

keyColumn is a STRING scalar or vector indicating the grouping column(s). The calculation is conducted within each group.

filter (optional) is the metacode that indicates the filtering conditions in the form of expression. Only the results that satisfy the filter conditions are ingested to the output table. You can specify multiple conditions with logical operators (and, or).

To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.

snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved.

  • The directory must already exist, otherwise an exception is thrown.

  • If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state.

  • Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

  • The file extension of a snapshot can be:

    • <engineName>.tmp: temporary snapshot

    • <engineName>.snapshot: a snapshot that is generated and flushed to disk

    • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.

keepOrder (optional) specifies whether to preserve the insertion order of the records in the output table. If keyColumn contains a time column, the default value is true, and otherwise false.

To clean up the data that is no longer needed after calculation, specify parameters keyPurgeFilter and keyPurgeFreqInSecond.

keyPurgeFilter (optional) indicates the filter conditions that identify the data to be purged from the cache. It is metacode composed of conditional expressions, and these expressions must refer to the columns in the outputTable.

keyPurgeFreqInSecond (optional) is a positive integer indicating the time interval (in seconds) to trigger a purge.

For each data ingestion, the engine starts a purge if all of the following conditions are satisfied:

(1) The time elapsed since the last data ingestion is equal to or greater than keyPurgeFreqInSecond (For the first check, the time elapsed between the ingestion of data and the creation of the engine is used);

(2) If the first condition is satisfied, the engine applies keyPurgeFilter to the cached data to get the data to be purged.

(3) The number of groups which contain data to be purged is equal to or greater than 10% of the total number of groups in the engine.

To check the engine status before and after the purge, call getStreamEngineStat().ReactiveStreamEngine (see getStreamEngineStat) where the numGroups field indicates the number of groups in the reactive state streaming engine.

raftGroup (optional) is an integer greater than 1, indicating ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability on the streaming engine. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

outputElapsedMicroseconds (optional) is a BOOLEAN value. The default value is false. It determines whether to output:

  • the elapsed time (in microseconds) from the ingestion of data to the output of result in each batch.

  • the total number of each batch.

Examples

$ def sum_diff(x, y){
$  return (x-y)/(x+y)
$ }
$ factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>
$ share streamTable(1:0, `sym`time`price, [STRING,DATETIME,DOUBLE]) as tickStream
$ result = table(1000:0, `sym`time`factor1, [STRING,DATETIME,DOUBLE])
$ rse = createReactiveStateEngine(name="reactiveDemo", metrics =[<time>, factor1], dummyTable=tickStream, outputTable=result, keyColumn="sym", filter=<sym in ["000001.SH", "000002.SH"]>)
$ subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})

$ data1 = table(take("000001.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 10+cumsum(rand(0.1, 100)-0.05) as price)
$ data2 = table(take("000002.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 20+cumsum(rand(0.2, 100)-0.1) as price)
$ data3 = table(take("000003.SH", 100) as sym, 2021.02.08T09:30:00 + 1..100 *3 as time, 30+cumsum(rand(0.3, 100)-0.15) as price)
$ data = data1.unionAll(data2).unionAll(data3).sortBy!(`time)

$ replay(inputTables=data, outputTables=tickStream, timeColumn=`time)

// Execute the following code before re-run the above code.
$ unsubscribeTable(tableName=`tickStream, actionName="factors")
$ dropStreamEngine(`reactiveDemo)
$ undef(`tickStream, SHARED)

The result only contains the stocks “000001.SH” and “000002.SH” that are specified in the filtering condition.

Calculate in groups by date and sym column. Then output the result which time is between “2012.01.01” and “2012.01.03”.

$ share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
$ outputTable = table(100:0, `date`sym`factor1, [DATE, STRING, DOUBLE])
$ engine = createReactiveStateEngine(name="test", metrics=<mavg(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn=["date","sym"], filter=<date between 2012.01.01 : 2012.01.03>, keepOrder=true)
$ subscribeTable(tableName=`trades, actionName="test", msgAsTable=true, handler=tableInsert{engine})

$ n=100
$ tmp = table(rand(2012.01.01..2012.01.10, n) as date, rand(09:00:00.000..15:59:59.999, n) as time, rand("A"+string(1..10), n) as sym, rand(['B', 'S'], n) as market, rand(100.0, n) as price, rand(1000..2000, n) as qty)
$ trades.append!(tmp)
$ select * from outputTable

// Execute the following code before re-run the above code.
$ unsubscribeTable(tableName=`trades, actionName="test")
$ dropStreamEngine(`test)
$ undef(`trades, SHARED)