createTimeSeriesEngine

Syntax

createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime=false], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill=’none’], [forceTriggerTime], [raftGroup], [keyPurgeFreqInSec], [closed=’left’])

Alias: createTimeSeriesAggregator

Details

Create a time-series engine to conduct real-time time-series aggregate calculations with streaming data.

Return a table object that subscribes to a stream table. Ingesting data to this table means that data enters the engine for windowed aggregations.

If keyColumn is specified to group the records that have the same key, the window calculation will be performed in each group.

Rules for timeColumn:

1. If keyColumn is not specified, the values in timeColumn must be incremental.

2. If keyColumn is specified, the values from the same group in the timeColumn must be incremental.

Otherwise, out-of-order data will will be discarded.

Generally it is used in conjunction with subscribeTable. Refer to Time-Series Stream Engine of DolphinDB for more information. The engine it creates can be called across sessions.

Arguments

name is a string indicating the name of the engine. It is the only identifier of an engine. It can have letter, number and “_”. It must start with a letter.

useSystemTime is an optional BOOLEAN indicating whether the calculations are performed based on the system time when data is ingested into the engine.

  • useSystemTime = true: the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any temporal columns in the streaming table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamp when the calculation occurred.

  • useSystemTime = false (default): the engine will window the streaming data according to the timeColumn in the stream table. The calculation for a window is triggered by the first record after the previous window. Please note that the record which triggers the calculation will not participate in this calculation.

For example, there is a window ranges from 10:10:10 to 10:10:19. If useSystemTime=true and the window is not empty, the calculation will be triggered at 10:10:20. If useSystemTime=false and the first record after 10:10:19 is at 10:10:25, the calculation will be triggered at 10:10:25.

windowSize is a scalar or vector of positive integers that specifies the length of the windows for calculation. If it is a vector, the size must be the same as the number of elements in metrics. Only the left boundary is included in the window.

step is a positive integer indicating the length between the end time of two adjacent windows. Together with the starting time of the first window, it determines the end time of each window. The value of windowSize must be a multiple of step, otherwise an exception will be thrown.

The unit of windowSize and step depends on the value of useSystemTime.

  • If useSystemTime=true, the unit of windowSize and step is millisecond.

  • If useSystemTime=false, the unit of windowSize and step is the same as the unit of timeColumn.

The engine automatically adjusts the starting point of the first window according to the value of alignmentSize, which depends on step and the unit of timeColumn.

The system will do alignment for the starting time of first window. The alighmentSize(integer) will be decided by the step, precision and roundTime. When time series engine calculates in groups, all groups’ window will be uniformly aligned. The windows at the same time have the same boundary.

  • If the data type of timeColumn is DATETIME(yyyy-MM-dd HH:mm:ss) or SECOND(HH:mm:ss), the value of alignmentSize is as follows:

    if roundTime=false:

step

alignmentSize

0~2

2

3

3

4~5

5

6~10

10

11~15

15

16~20

20

21~30

30

>30

60 (1minutes)

if roundTime=true:

The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

step

alignmentSize

31~60

60 (1minutes)

60~120

120 (2minutes)

121~180

180 (3minutes)

181~300

300 (5minutes)

301~600

600 (10minutes)

601~900

900 (15minutes)

901~1200

1200 (20minutes)

1201~1800

1800 (30minutes)

>1800

3600 (1hour)

  • If the data type of timeColumn is TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm) or TIME(HH:mm:ss.mmm), the value of alignmentSize is as follows:

    if roundTime=false:

step

alignmentSize

0~2

2

3~5

5

6~10

10

11~20

20

21~25

25

26~50

50

51~100

100

101~200

200

201~250

250

251~500

500

501~1000

1000(1seconds)

1001~2000

2000(2seconds)

2001~5000

5000(5seconds)

5001~10000

10000(10seconds)

10001~15000

15000(15seconds)

15001~20000

20000(20seconds)

20001~30000

30000(30seconds)

>30000

60000(1minutes)

if roundTime=true:

The value of alignmentSize is same as above table if step<=30000; The value of alignmentSize is as folllows if step>30000:

step

alignmentSize

30001~60000

60000(1minutes)

60001~120000

120000(2minutes)

120001~300000

300000(5minutes)

300001~600000

600000(10minutes)

600001~900000

900000(15minutes)

900001~1200000

1200000(20minutes)

1200001~1800000

1800000(30minutes)

>1800000

3600000(1hour)

  • If the data type of timeColumn is NANOTIMESTAMP(yyyy-MM-dd HH:mm:ss.nnnnnnnnn) or NANOTIME(HH:mm:ss.nnnnnnnnn), the value of alignmentSize is as follows:

    if roundTime=false:

    step

    alignmentSize

    0~2ns

    2ns

    3ns~5ns

    5ns

    6ns~10ns

    10ns

    11ns~20ns

    20ns

    21ns~25ns

    25ns

    26ns~50ns

    50ns

    51ns~100ns

    100ns

    101ns~200ns

    200ns

    201ns~250ns

    250ns

    251ns~500ns

    500ns

    >500ns

    1000ns

    if roundTime=true:

    step

    alignmentSize

    1000ns~1ms

    1ms

    1ms~10ms

    10ms

    10ms~100ms

    100ms

    100ms~1s

    1s

    1s~2s

    2s

    2s~3s

    3s

    3s~5s

    5s

    5s~10s

    10s

    10s~15s

    15s

    15s~20s

    20s

    20s~30s

    30s

    >30s

    1min

If the time of the first record is x with data type of TIMESTAMP, then the starting time of the first window is adjusted to be timestamp(x/alignmentSize*alignmentSize), where / produces only the integer part after division. For example, if the time of the first record is 2018.10.08T01:01:01.365 and step=60000, then alignmentSize=60000, and the starting time of the first window is timestamp(2018.10.08T01:01:01.365/60000*60000)=2018.10.08T01:01:00.000.

metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to metaprogramming Metaprogramming

  • It can use one or more built-in or user-defined aggregate functions such as <[sum(volume), avg(price)]>, or expressions of aggregate functions such as as <[avg(price1)-avg(price2)]>, or aggregate functions on operations involving multiple columns such as <[std(price1-price2)]>.

  • You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it’s not necessary to specify the column names).

  • If windowSize is a vector, each element of windowSize can correspond to multiple formulas in metrics. For example, if windowSize=[10,20], metrics can be (<[min(volume), max(volume)]>, <sum(volume)>). Metrics can also input nested tuple vectors, such as [[<[min(volume), max(volume)]>, <sum(volume)>],[<avg(volume)>]].

DolphinDB has optimized the following aggregate functions in the time-series engine to achieve incremental computing: corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK

dummyTable is a table object whose schema must be the same as the subscribed stream table. The system uses the schema of dummyTable to determine the data type of each column in the messages. Whether dummyTable contains data does not affect the aggregation result.

outputTable is the output table for the calculation results. It can be a in-memory table or a DFS table. Before we use function createTimeSeriesEngine, we need to create outputTable and specify the names and data types of its columns. The engine inserts the results into it.

The following rules apply to the schema of outputTable:

  • The data type of the first column must be temporal. It is TIMESTAMP if useSystemTime=true. It is the same as the data type of timeColumn if useSystemTime=false.

  • If keyColumn is specified, the second column is keyColumn.

  • The remaining columns are calculation results.

timeColumn is a string scalar indicating the name of the temporal column of the subscribed stream table when useSystemTime=false.

Please note:

  • It must be a string vector composed of date(type DATE) and time(type TIME, SECOND, NANOTIME). The first column in output table must be as the same type as concatDateTime(date, time).

  • If keyColumn is not specified, timeColumn must be increasing; if keyColumn is specified, timeColumn must be increasing within each group specified by keyColumn.

keyColumn is a string scalar/vector indicating the grouping column name. The engine conducts the calculations within each group specified by keyColumn if it is specified.

garbageSize is a positive integer. It is optional and the default value is 50,000. As the subscribed data continues to accumulate in the engine, a machenism is needed to clear data in the memory that is no longer needed. When the number of rows of historical data in the memory exceeds garbageSize, the system will clear the historical data that is not needed for the current calculation.

If keyColumn is specified, memory cleanup is performed independently within each group. When the number of rows of historical data in memory of a group exceeds garbageSize, the historical data that is not needed in the current calculation in this group will be cleared.

updateTime is a positive integer. It can be used to trigger multiple calculations within the current window. The unit of updateTime is the same as the unit of step, and step must be an integer multiple of updateTime. To specify updateTime, useSystemTime must be set to false.

If updateTime is not specified, calculation for a window will not occur before the window ends.

If updateTime is specified, multiple calculations may happen within the current window. These calculations are triggered with the following rules:

  • Divide the current window into windowSize/updateTime small windows. Each small window has a length of updateTime. When a new record arrives after a small window finishes, if there is at least one record in the current window that is not used in a calculation (excluding the new record), a calculation is triggered. Please note that this calculation does not use the new record.

  • If max(2*updateTime, 2 seconds) after a record arrives at the engine, it still has not been used in a calculation, a calculation is triggered. This calculation includes all data in the current window at the time.

If keyColumn is specified, these rules apply within each group.

Please note that the timestamp of each calculation result within the current window is the current window starting time or starting time + windowSize (depending on parameter useWindowStartTime) instead of a timestamp inside the current window.

It is recommended to specify a keyed table for outputTable if updateTime is set. If outputTable is a standard in-memory table or stream table, it will have multiple results for each timestamp (in each group). It is not recommended to use a keyed stream table either as the records of a keyed stream table cannot be updated.

useWindowStartTime is a Boolean value indicating whether the first column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the starting time of the windows + windowSize. If windowSize is a vector, useWindowStartTime must be false.’

roundTime is an optional parameter with a Boolean value indicating the method to align the window boundary if the time precision is milliseconds or seconds and step is bigger than one minute. The default value is true indicating the alignment is based on the multi-minute rule (see the table above). False means alignment is based on the one-minute rule(see the table above).

snapshotDir is a string indicating the directory where the streaming engine snapshot is saved. The snapshot is used to restore the streaming engine after system interruption.

  • The directory must already exist, otherwise an exception is thrown.

  • When creating a streaming engine, if snapshotDir is specified, the system will check whether there is a snapshot in the directory already. If it exists, the snapshot will be loaded to restore the state of the engine.

  • Multiple streaming engines can share a directory.

  • A snapshot may use 3 names. When the snapshot is being generated, the file name is <engineName>.tmp; after the snapshot is generated and flushed to the disk, it is renamed to <engineName>.snapshot; if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount is a positive integer indicating the number of messages between 2 streaming engine snapshots.

To enable snapshots in streaming engines, snapshotDir and snapshotIntervalInMsgCount must be specified.

fill is a vector/scalar indicating the filling method to deal with an empty window (in a group). It could be:

  • ‘none’: no result

  • ‘null’: result is NULL.

  • ‘ffill’: output the result in the last window.

  • ‘specific value’: Specify a value. Its type should be the same as metrics output’s type.

fill could be a vector to specify different fill method for each metric. The element in vector can’t be ‘none’.

forceTriggerTime is an optional parameter. It is a non-negative integer indicating the waiting time (in seconds) to force trigger calculation in the uncalculated windows for each group. The rules are as follow:

1. Suppose the end time of the uncalculated window is t, and an incoming record of another group arrives at t1: when t1-t>=forceTriggerTime, calculation of the window will be triggered.

2. When a window doesn’t contain any records:

  • If fill is not specified, the window returns no result.

  • If fill is specified, the result of the window will be filled using the specified filling method.

Note: useSystemTime must be false and updateTime cannot be specified if forceTriggerTime is set. If the data belonging to the current window is still be injected into the engine after the calculation is forced to be triggered, the data will be calculated in the next window. It is recommended not to set the a small value to forceTriggerTime.

Note the following points when setting forceTriggerTime or updateTime:

  • If updateTime is set, the output table must be a keyedTable; If forceTriggerTime is set, output table can be an in-memory table or a distributed table.

  • If the data belonging to the current window is still reached after the calculation is forced to be triggered, the data will be calculated in the next window.

raftGroup is an integer greater than 1 indicating Raft group ID. Setting this parameter to enable high availability in the computing engine. The calculation engines will be created on other Followers, and the snapshot will be synchronized to other Followers as well. Enable high availability on subscriber with function subscribeTable. The raftGroup ID of subscribeTable must be the same as that of the stream engine. When the leader is down, it will automatically switch to a new leader and re-subscribe. Please note that SnapShotDir must be specified if raftGroup is specified.

keyPurgeFreqInSec is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.

Note: To specify this parameter, parameter forceTriggerTime must be specified and parameter fill cannot be specified.

You can check the number of groups in a time-series streaming engine based on the column numGroups returned by getStreamEngineStat.

closed is a STRING indicating whether the left or the right boundary is included.

  • closed = ‘left’: left-closed, right-open

  • closed = ‘right’: left-open, right-closed

Examples

In the following example, the time-series engine agg1 subscribes to the stream table trades and calculates sum(volume) for each stock in the last minute in real time. The result is saved in table output1.

$ share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
$ output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

$ insert into trades values(2018.10.08T01:01:01.785,`A,10)
$ insert into trades values(2018.10.08T01:01:02.125,`B,26)
$ insert into trades values(2018.10.08T01:01:10.263,`B,14)
$ insert into trades values(2018.10.08T01:01:12.457,`A,28)
$ insert into trades values(2018.10.08T01:02:10.789,`A,15)
$ insert into trades values(2018.10.08T01:02:12.005,`B,9)
$ insert into trades values(2018.10.08T01:02:30.021,`A,10)
$ insert into trades values(2018.10.08T01:04:02.236,`A,29)
$ insert into trades values(2018.10.08T01:04:04.412,`B,32)
$ insert into trades values(2018.10.08T01:04:05.152,`B,23)

$ sleep(10)

$ select * from output1;

time

sym

sumVolume

2018.10.08T01:02:00.000

A

38

2018.10.08T01:02:00.000

B

40

2018.10.08T01:03:00.000

A

25

2018.10.08T01:03:00.000

B

9

The following paragraphs explain in details how the time-series engine conducts the calculations. For simplicity, regarding the “time” column we ignore the part of “2018.10.08T” and only use the “hour:minute:second.millisecond” part.

First, the time-series engine adjusts the starting time of the first window to be 01:01:00.000. The first window is from 01:01:00.000 (inclusive) to 01:02:00.000 (exclusive). When the record (01:02:10.789,`A,15) arrives, it triggers the calculation of group A for the first window; the arrival of (01:02:12.005,`B,9) triggers the calculation of group B for the first window.

The second window is from 01:02:00.000 (inclusive) to 01:03:00.000 (exclusive). When the record (01:04:02.236,`A,29) arrives, it triggers the calculation of group A for the second window; the arrival of (01:04:04.412,`B,32) triggers the calculation of group B for the second window.

As there are no records since 01:05:00.000, no calculations are triggers for the window of [01:04:00.000, 01:05:00.000).

The table output1 stores the calculation results of the time-series engine. As useWindowStartTime=false, the timestamps in the output table are the starting time of the windows + windowSize. If useWindowStartTime=true, then the timestamps in the output table are the starting time of the windows, as illustrated in the following example:

$ output2 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine2 = createTimeSeriesEngine(name="engine2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=true)
$ subscribeTable(tableName="trades", actionName="engine2", offset=0, handler=append!{engine2}, msgAsTable=true)

$ sleep(10)
$ select * from output2;

time

sym

sumVolume

2018.10.08T01:01:00.000

A

38

2018.10.08T01:01:00.000

B

40

2018.10.08T01:02:00.000

A

25

2018.10.08T01:02:00.000

B

9

In the following example, we specify updateTime to be 1000 (milliseconds):

$ output3 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine3 = createTimeSeriesEngine(name="engine3", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output3, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false)
$ subscribeTable(tableName="trades", actionName="engine3", offset=0, handler=append!{engine3}, msgAsTable=true)

$ sleep(2001)
$ select * from output3;

time

sym

sumVolume

2018.10.08T01:02:00.000

A

38

2018.10.08T01:02:00.000

B

40

2018.10.08T01:03:00.000

A

25

2018.10.08T01:03:00.000

B

9

2018.10.08T01:05:00.000

B

55

2018.10.08T01:05:00.000

A

29

Next, we will explain the calculations triggered in the last window from 01:04:00.000 to 01:05:00.000.

  • At 01:04:04.236, 2000 milliseconds after the first record of group A arrived, a group A calculation is triggered. The result (01:05:00.000, `A, 29) is written to the output table.

  • The record of group B at 01:04:05.152 is the first record after the small window of [01:04:04.000, 01:04:05.000) that contains the group B record at 01:04:04.412. It triggers a group B calculation. The result (01:05:00.000,”B”,32) is written to the output table.

  • 2000 milliseconds later, at 01:04:07.152, as the B group record at 1:04:05.152 has not been used in a calculation, a group B calculation is triggered. The result is (01:05:00.000,”B”,55). As the output table’s keys are columns ‘time’ and ‘sym’, the record of (01:05:00.000,”B”,32) in the output table is updated and becomes (01:05:00.000,”B”,55).

In the example, the shared stream table “pubT” contains two time columns with the type of DATE and SECOND. When creating time series engine, the two time columns could be combined into one column with the type of DATETIME in output table “streamMinuteBar_1min” by setting timeColumn.

$ colNames=`symbol`date`minute`price`type`volume
$ colTypes=`SYMBOL`DATE`SECOND`DOUBLE`STRING`INT
$ pubTable = streamTable(10000:0,colNames,colTypes)
$ share pubTable as pubT

$ colNames = `time`symbol`open`max`min`close`volume`amount`ret`vwap
$ colTypes = `DATETIME`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE
$ streamMinuteBar_1min = streamTable(10000:0,colNames, colTypes)

$ tsAggrOHLC = createTimeSeriesEngine(name="subT", windowSize=60, step=60, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=pubTable, outputTable=streamMinuteBar_1min, timeColumn=`date`minute, useSystemTime=false, keyColumn='symbol', fill=`none)
$ subscribeTable(tableName="pubT", actionName="subT", offset=-1, handler=append!{tsAggrOHLC}, msgAsTable=true)

$ insert into pubT values(`000001, 2021.04.05, 09:25:01, 1, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:30:05, 2, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:31:06, 3, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:35:05, 4, 'S', 4)
$ insert into pubT values(`000001, 2021.04.05, 09:40:05, 5, 'S', 5)
$ insert into pubT values(`000001, 2021.04.06, 09:25:05, 6, 'S', 6)

symbol

date

minute

price

type

volume

000001

2021.04.05

09:25:01

1

B

1

000001

2021.04.05

09:30:05

2

B

1

000001

2021.04.05

09:31:06

3

B

1

000001

2021.04.05

09:35:05

4

S

4

000001

2021.04.05

09:40:05

5

S

5

000001

2021.04.06

09:25:05

6

S

6

$ select * from streamMinuteBar_1min

time

symbol

open

max

min

close

volume

amount

ret

vwap

2021.04.05T09:26:00

000001

1

1

1

1

1

1

0

1

2021.04.05T09:31:00

000001

2

2

2

2

1

2

1

2

2021.04.05T09:32:00

000001

3

3

3

3

1

3

2

3

2021.04.05T09:36:00

000001

4

4

4

4

4

16

3

4

2021.04.05T09:41:00

000001

5

5

5

5

5

25

4

5

$ share streamTable(1000:0, `time`sym`qty, [DATETIME, SYMBOL, INT]) as trades
$ output3 = table(10000:0, `time`sym`sumQty, [DATETIME, SYMBOL, INT])

$ engine = createTimeSeriesEngine(name="engine", windowSize=6, step=6, metrics=<sum(qty)>, dummyTable=trades, outputTable=output3, timeColumn=`time,keyColumn=`sym, forceTriggerTime=7,fill=1000)
$ subscribeTable(tableName="trades", actionName="engine", offset=0, handler=append!{engine}, msgAsTable=true)
$ sleep(1000)
$ insert into engine values(2018.08.01T14:05:43,`A,1)
$ insert into engine values(2018.08.01T14:05:43,`C,3)
$ sleep(10)
$ insert into engine values(2018.08.01T14:05:44,`B,1)
$ sleep(80)
$ insert into engine values(2018.08.01T14:05:52,`B,3)
$ sleep(20)
$ insert into engine values(2018.08.01T14:05:54,`A,3)
$ sleep(10)
$ insert into engine values(2018.08.01T14:05:55,`A,5)
$ sleep(20)
$ insert into engine values(2018.08.01T14:05:57,`B,5)
$ sleep(50)
$ insert into engine values(2018.08.01T14:06:12,`A,1)
$ sleep(50)
$ select * from output3 order by sym

time

sum

Qty

2018.08.01T14:05:46

A

1

2018.08.01T14:05:52

A

1,000

2018.08.01T14:05:58

A

8

2018.08.01T14:06:04

A

1,000

2018.08.01T14:06:10

A

1,000

2018.08.01T14:05:46

B

1

2018.08.01T14:05:52

B

1,000

2018.08.01T14:05:58

B

8

2018.08.01T14:05:46

C

3

2018.08.01T14:05:52

C

1,000