createTimeSeriesEngine

Syntax

createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill=’none’], [forceTriggerTime], [raftGroup], [keyPurgeFreqInSec=-1], [closed=’left’], [outputElapsedMicroseconds=false])

Alias: createTimeSeriesAggregator

Details

This function creates a time-series streaming engine to conduct real-time time-series calculations with moving windows, and returns a table object where data is ingested for window calculations.

There are two types of aggregate operators in the time-series engine: incremental operators and full operators. Incremental operators incrementally aggregate the data as they arrive without keeping the historical data. Full operators (e.g., user-defined aggregate functions, unoptimized built-in aggregate functions, or functions with nested state functions) keep all the data in a window and recompute the output as a full refresh whenever new data arrives.

The following aggregate operators in the time-series engine are optimized for incremental computations: corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK.

Windowing logic

  • Window boundaries: The engine automatically adjusts the starting point of the first window. (see parameter description for step and roundTime, and alignment rules).

  • Window properties:

    • windowSize - the size of each window;

    • closed - whether the left/right boundaries of a window is inclusive/exclusive;

    • step - the duration of time between windows;

    • useSystemTime specifies how values are windowed - based on the time column in the data or the system time of data ingestion.

Calculation Rules

  • If timeColumn is specified, its values must be increasing. If keyColumn is specified to group the data, the values in timeColumn must be increasing with each group specified by keyColumn. Otherwise, out-of-order data will be discarded.

  • If useSystemTime = true, the calculation of a window is triggered as soon as the window ends. If useSystemTime = false (with timeColumn specified), the calculation of a window is triggered by the arrival of the next record after the window ends. To trigger the calculation for the uncalculated windows, you can specify the parameter updateTime or forceTriggerTime. See the associated parameter description for details.

  • If fill is unspecified or “None”, only windows with calculation results are output. If fill is specified, all windows are output, and the empty windows are filled using the specified filling method.

Other Features

  • Data/state cleanup

You can set a cleanup rule to clear historical data. (See parameters keyPurgeFilter and keyPurgeFreInSecond)

  • Snapshot

Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshotDir and snapshotIntervalInMsgCount)

  • High availability of streaming engines

To enable high availability of streaming engines, specify the parameter raftGroup on the leader node of the raft group on the subscriber. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table.

Arguments

name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and “_” and must start with a letter.

windowSize is a scalar or vector with positive integers that specifies the size of the windows for calculation.

step is a positive integer indicating how much each window moves forward relative to the previous one. Note that step must be divisible by windowSize, otherwise an exception will be thrown.

The unit of windowSize and step are determined by the value of useSystemTime.

  • If useSystemTime =true, the unit of windowSize and step is millisecond.

  • If useSystemTime =false, the unit of windowSize and step is the same as the unit of timeColumn.

metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to [metaprogramming] (/Objects/Metaprogramming.rst).

  • It can use one or more built-in or user-defined aggregate functions (which must be defined by the defg keyword) such as <[sum(volume), avg(price)]>, or expressions of aggregate functions such as as <[avg(price1)-avg(price2)]>, or aggregate functions involving multiple columns such as <[std(price1-price2)]>. Note that nested aggregate functions are not allowed.

  • You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it’s optional to specify the column names).

  • If metrics is a tuple with multiple formulas, windowSize is specified as a vector of the same length as metrics. Each element of windowSize corresponds to the elements in metrics. For example, if windowSize=[10,20], metrics can be (<[min(volume), max(volume)]>, <sum(volume)>). metrics can also input nested tuple vectors, such as [[<[min(volume), max(volume)]>, <sum(volume)>],[<avg(volume)>]].

Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables. However, if there are multiple identical column names specified in metrics, their cases must be consistent.

dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.

outputTable is a table to which the engine inserts calculation results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.

The output columns are in the following order:

(1) The first column must be a time column. - if useSystemTime = true, it is TIMESTAMP; Otherwise, it has the same data type as timeColumn; - if useWindowStartTime = true, the column displays the start time of each window; Otherwise, it displays the end time of each window.

(2) If keyColumn is specified, the subsequent column(s) must be in the same order as that specified by keyColumn.

(3) If outputElapsedMicroseconds is set to true, you need to specify a column of LONG type. See the outputElapsedMicroseconds parameter for details.

(4) Then followed by one or more result columns.

timeColumn (optional) is a STRING scalar or vector specifying the time column(s) of the subscribed stream table. When useSystemTime = false, it must be specified.

Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).

useSystemTime (optional) is a BOOLEAN indicating whether the calculations are performed based on the system time when data is ingested into the engine.

  • useSystemTime = true: the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any temporal columns in the streaming table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamp when the calculation occurred.

  • useSystemTime = false (default): the engine will window the streaming data according to the timeColumn in the stream table. The calculation for a window is triggered by the first record after the previous window. Please note that the record which triggers the calculation will not participate in this calculation.

For example, there is a window ranges from 10:10:10 to 10:10:19. If useSystemTime = true and the window is not empty, the calculation will be triggered at 10:10:20. If useSystemTime = false and the first record after 10:10:19 is at 10:10:25, the calculation will be triggered at 10:10:25.

keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.

garbageSize (optional) is a positive integer. The default value is 50,000 (rows). The subscribed data continues to accumulate in the engine. When the number of rows of historical data in the memory exceeds garbageSize, the system will clear the historical data that is no longer needed.

Note: For incremental operators, the data no longer needed will be automatically removed from memory. However, for full operators, this parameter must be specified to enable garbage collection.

updateTime (optional) is a positive integer which takes the same time precision as timeColumn. It is used to trigger window calculations at an interval shorter than step. step must be a multiple of updateTime. To specify updateTime, useSystemTime must be set to false.

If updateTime is not specified, calculation for a window will not occur before the window ends. By specifying updateTime, you can calculate the values for several times in an window of which calculation hasn’t been triggered for a long time.

The calculations within a window are triggered with the following rules:

  • Starting from the left boundary of the window, if there is a new record arriving after every updateTime, all data before this record in the current window is calculated. If it still has unprocessed data after 2**updateTime* (at least 2 seconds), all data in this window is calculated.

  • If keyColumn is specified, these rules apply within each group.

It is recommended to specify a keyed table for outputTable if updateTime is set. If outputTable is a standard in-memory table or stream table, it will have multiple results for each timestamp (in each group). It is not recommended to use a keyed stream table either as the records of a keyed stream table cannot be updated.

useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows. If the windowSize is a vector, useWindowStartTime must be false.

roundTime (optional) is a Boolean value indicating the method to align the window boundary if the time precision is milliseconds or seconds and step is bigger than one minute. The default value is true indicating the alignment is based on the multi-minute rule (see the alignment rules). False means alignment is based on the one-minute rule.

snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved.

  • The directory must already exist, otherwise an exception is thrown.

  • If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state.

  • Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

  • The file extension of a snapshot can be:

    • <engineName>.tmp: temporary snapshot

    • <engineName>.snapshot: a snapshot that is generated and flushed to disk

    • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.

fill (optional) is a vector/scalar indicating the filling method to deal with an empty window (in a group). It could be:

  • ‘none’: no result

  • ‘null’: output a NULL value.

  • ‘ffill’: output the result in the last window.

  • ‘specific value’: output a specified value. Its type should be the same as metrics output’s type.

fill could be a vector to specify different filling method for each metric. The size of the vector must be consistent with the number of elements specified in metrics. The element in vector cannot be ‘none’.

forceTriggerTime (optional) is a non-negative integer which takes the same time precision as timeColumn, indicating the waiting time to force trigger calculation in the uncalculated windows for each group. If forceTriggerTime is set, useSystemTime must be false and updateTime cannot be specified.

The rules are as follow:

(1) Suppose the end time of the uncalculated window is t, and an incoming record of another group arrives at t1: when t1-t>=forceTriggerTime, calculation of the window will be triggered.

(2) If no data is ingested into a group after the last window is calculated, and new data continues to ingest into other groups, the specified fill parameter can be used to fill results for empty windows of that group. The group’s windows will still be output at the latest time point. If parameter fill is not specified, no new windows will be generated for that group after the last window has been triggered for computation.

Note the following points when setting forceTriggerTime or updateTime:

  • If updateTime is specified, the result of the current window calculation will be updated again when data belonging to the current window still arrives after the calculation is triggered.

  • If forceTriggerTime is specified, the incoming data with a timestamp within the current window will be discarded after the calculation is forced to be triggered.

raftGroup (optional) is an integer greater than 1, indicating ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability on the streaming engine. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.

Note: To specify this parameter, parameter forceTriggerTime must be specified and parameter fill cannot be specified.

You can check the number of groups in a time-series streaming engine based on the column “numGroups” returned by getStreamEngineStat.

closed (optional) is a STRING indicating whether the left or the right boundary is included.

  • closed = ‘left’: left-closed, right-open

  • closed = ‘right’: left-open, right-closed

outputElapsedMicroseconds a BOOLEAN value. The default value is false. It determines whether to output the elapsed time (in microseconds) from the ingestion of data to the output of result for each window.

Alignment Rules

To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) will be decided by parameter step, roundTime, and the precision of timeColumn. When time series engine calculates within groups, all groups’ windows will be uniformly aligned. The boundaries of each window are the same for each group.

  • If the data type of timeColumn is MINUTE(HH:mm), the value of alignmentSize is as follows:

    if roundTime=false:

    step

    alignmentSize

    0~2

    2

    3

    3

    4~5

    5

    6~10

    10

    11~15

    15

    16~20

    20

    21~30

    30

    >30

    60 (1 hour)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step

    alignmentSize

    31~60

    60 (1 hour)

    60~120

    120 (2 hours)

    121~180

    180 (3 hours)

    181~300

    300 (5 hours)

    301~600

    600 (10 hours)

    601~900

    900 (15 hours)

    901~1200

    1200 (20 hours)

    1201~1800

    1800 (30 hours)

    >1800

    3600 (60 hours)

  • If the data type of timeColumn is DATETIME (yyyy-MM-dd HH:mm:ss) or SECOND (HH:mm:ss), the value of alignmentSize is as follows:

    if roundTime=false:

    step

    alignmentSize

    0~2

    2

    3

    3

    4~5

    5

    6~10

    10

    11~15

    15

    16~20

    20

    21~30

    30

    >30

    60 (1minute)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step

    alignmentSize

    31~60

    60 (1minute)

    61~120

    120 (2minutes)

    121~180

    180 (3minutes)

    181~300

    300 (5minutes)

    301~600

    600 (10minutes)

    601~900

    900 (15minutes)

    901~1200

    1200 (20minutes)

    1201~1800

    1800 (30minutes)

    >1800

    3600 (1hour)

  • If the data type of timeColumn is TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm) or TIME(HH:mm:ss.mmm), the value of alignmentSize is as follows:

    if roundTime=false:

    step

    alignmentSize

    0~2

    2

    3~5

    5

    6~10

    10

    11~20

    20

    21~25

    25

    26~50

    50

    51~100

    100

    101~200

    200

    201~250

    250

    251~500

    500

    501~1000

    1000(1second)

    1001~2000

    2000(2seconds)

    2001~5000

    5000(5seconds)

    5001~10000

    10000(10seconds)

    10001~15000

    15000(15seconds)

    15001~20000

    20000(20seconds)

    20001~30000

    30000(30seconds)

    >30000

    60000(1minutes)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30000; The value of alignmentSize is as folllows if step>30000:

    step

    alignmentSize

    30001~60000

    60000(1minute)

    60001~120000

    120000(2minutes)

    120001~300000

    300000(5minutes)

    300001~600000

    600000(10minutes)

    600001~900000

    900000(15minutes)

    900001~1200000

    1200000(20minutes)

    1200001~1800000

    1800000(30minutes)

    >1800000

    3600000(1hour)

  • If the data type of timeColumn is NANOTIMESTAMP(yyyy-MM-dd HH:mm:ss.nnnnnnnnn) or NANOTIME(HH:mm:ss.nnnnnnnnn), the value of alignmentSize is as follows:

    if roundTime=false:

    step

    alignmentSize

    0~2ns

    2ns

    3ns~5ns

    5ns

    6ns~10ns

    10ns

    11ns~20ns

    20ns

    21ns~25ns

    25ns

    26ns~50ns

    50ns

    51ns~100ns

    100ns

    101ns~200ns

    200ns

    201ns~250ns

    250ns

    251ns~500ns

    500ns

    >500ns

    1000ns

    if roundTime=true:

    step

    alignmentSize

    1000ns~1ms

    1ms

    1ms~10ms

    10ms

    10ms~100ms

    100ms

    100ms~1s

    1s

    1s~2s

    2s

    2s~3s

    3s

    3s~5s

    5s

    5s~10s

    10s

    10s~15s

    15s

    15s~20s

    20s

    20s~30s

    30s

    >30s

    1min

If the time of the first record is x with data type of TIMESTAMP, then the starting time of the first window is adjusted to be timeType_cast(x/alignmentSize*alignmentSize+step-windowSize), where “/” produces only the integer part after division. For example, if the time of the first record is 2018.10.08T01:01:01.365, windowSize = 120000, and step = 60000, then alignmentSize = 60000, and the starting time of the first window is timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.

Examples

In the following example, the time-series engine1 subscribes to the stream table “trades” and calculates sum(volume) for each stock in the last minute in real time. The result is saved in table output1.

$ share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
$ output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

$ insert into trades values(2018.10.08T01:01:01.785,`A,10)
$ insert into trades values(2018.10.08T01:01:02.125,`B,26)
$ insert into trades values(2018.10.08T01:01:10.263,`B,14)
$ insert into trades values(2018.10.08T01:01:12.457,`A,28)
$ insert into trades values(2018.10.08T01:02:10.789,`A,15)
$ insert into trades values(2018.10.08T01:02:12.005,`B,9)
$ insert into trades values(2018.10.08T01:02:30.021,`A,10)
$ insert into trades values(2018.10.08T01:04:02.236,`A,29)
$ insert into trades values(2018.10.08T01:04:04.412,`B,32)
$ insert into trades values(2018.10.08T01:04:05.152,`B,23)

$ sleep(10)

$ select * from output1;

time

sym

sumVolume

2018.10.08T01:02:00.000

A

38

2018.10.08T01:02:00.000

B

40

2018.10.08T01:03:00.000

A

25

2018.10.08T01:03:00.000

B

9

The following paragraphs explain in details how the time-series engine conducts the calculations. For simplicity, regarding the “time” column we ignore the part of “2018.10.08T” and only use the “hour:minute:second.millisecond” part.

First, the time-series engine adjusts the starting time of the first window to be 01:01:00.000. The first window is from 01:01:00.000 (inclusive) to 01:02:00.000 (exclusive). When the record (01:02:10.789,`A,15) arrives, it triggers the calculation of group A for the first window; the arrival of (01:02:12.005,`B,9) triggers the calculation of group B for the first window.

The second window is from 01:02:00.000 (inclusive) to 01:03:00.000 (exclusive). When the record (01:04:02.236,`A,29) arrives, it triggers the calculation of group A for the second window; the arrival of (01:04:04.412,`B,32) triggers the calculation of group B for the second window.

As there are no records since 01:05:00.000, no calculations are triggered for the window of [01:04:00.000, 01:05:00.000).

The table output1 stores the calculation results of the time-series engine. As useWindowStartTime=false, the timestamps in the output table are the end time of the windows. If useWindowStartTime=true, then the timestamps in the output table are the starting time of the windows, as illustrated in the following example:

$ output2 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine2 = createTimeSeriesEngine(name="engine2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=true)
$ subscribeTable(tableName="trades", actionName="engine2", offset=0, handler=append!{engine2}, msgAsTable=true)

$ sleep(10)
$ select * from output2;

time

sym

sumVolume

2018.10.08T01:01:00.000

A

38

2018.10.08T01:01:00.000

B

40

2018.10.08T01:02:00.000

A

25

2018.10.08T01:02:00.000

B

9

In the following example, we specify updateTime to be 1000 (milliseconds):

$ output3 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
$ engine3 = createTimeSeriesEngine(name="engine3", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output3, timeColumn=\`time, useSystemTime=false, keyColumn=\`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false)
$ subscribeTable(tableName="trades", actionName="engine3", offset=0, handler=append!{engine3}, msgAsTable=true)

$ sleep(2001)
$ select * from output3;

time

sym

sumVolume

2018.10.08T01:02:00.000

A

38

2018.10.08T01:02:00.000

B

40

2018.10.08T01:03:00.000

A

25

2018.10.08T01:03:00.000

B

9

2018.10.08T01:05:00.000

B

55

2018.10.08T01:05:00.000

A

29

Next, we will explain the calculations triggered in the last window from 01:04:00.000 to 01:05:00.000.

(1) At 01:04:04.236, 2000 milliseconds after the first record of group A arrived, a group A calculation is triggered. The result (01:05:00.000, `A, 29) is written to the output table.

(2) The record of group B at 01:04:05.152 is the first record after the small window of [01:04:04.000, 01:04:05.000) that contains the group B record at 01:04:04.412. It triggers a group B calculation. The result (01:05:00.000,”B”,32) is written to the output table.

(3) 2000 milliseconds later, at 01:04:07.152, as the B group record at 1:04:05.152 has not been used in a calculation, a group B calculation is triggered. The result is (01:05:00.000,”B”,55). As the output table’s keys are columns ‘time’ and ‘sym’, the record of (01:05:00.000,`B,32) in the output table is updated and becomes (01:05:00.000,`B,55).

In the example, the shared stream table “pubT” contains two time columns with the type of DATE and SECOND. When creating time series engine, the two time columns could be combined into one column with the type of DATETIME in output table “streamMinuteBar_1min” by setting timeColumn.

$ colNames=`symbol`date`minute`price`type`volume
$ colTypes=`SYMBOL`DATE`SECOND`DOUBLE`STRING`INT
$ pubTable = streamTable(10000:0,colNames,colTypes)
$ share pubTable as pubT

$ colNames = `time`symbol`open`max`min`close`volume`amount`ret`vwap
$ colTypes = `DATETIME`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE
$ streamMinuteBar_1min = streamTable(10000:0,colNames, colTypes)

$ tsAggrOHLC = createTimeSeriesEngine(name="subT", windowSize=60, step=60, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=pubTable, outputTable=streamMinuteBar_1min, timeColumn=`date`minute, useSystemTime=false, keyColumn='symbol', fill=`none)
$ subscribeTable(tableName="pubT", actionName="subT", offset=-1, handler=append!{tsAggrOHLC}, msgAsTable=true)

$ insert into pubT values(`000001, 2021.04.05, 09:25:01, 1, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:30:05, 2, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:31:06, 3, 'B', 1)
$ insert into pubT values(`000001, 2021.04.05, 09:35:05, 4, 'S', 4)
$ insert into pubT values(`000001, 2021.04.05, 09:40:05, 5, 'S', 5)
$ insert into pubT values(`000001, 2021.04.06, 09:25:05, 6, 'S', 6)
symbol date minute price type volume
000001 2021.04.05 09:25:01 1 B 1
000001 2021.04.05 09:30:05 2 B 1
000001 2021.04.05 09:31:06 3 B 1
000001 2021.04.05 09:35:05 4 S 4
000001 2021.04.05 09:40:05 5 S 5
000001 2021.04.06 09:25:05 6 S 6
$ select * from streamMinuteBar_1min
time symbol open max min close volume amount ret vwap
2021.04.05T09:26:00 000001 1 1 1 1 1 1 0 1
2021.04.05T09:31:00 000001 2 2 2 2 1 2 1 2
2021.04.05T09:32:00 000001 3 3 3 3 1 3 2 3
2021.04.05T09:36:00 000001 4 4 4 4 4 16 3 4
2021.04.05T09:41:00 000001 5 5 5 5 5 25 4 5
$ share streamTable(1000:0, `time`sym`qty, [DATETIME, SYMBOL, INT]) as trades
$ output3 = table(10000:0, `time`sym`sumQty, [DATETIME, SYMBOL, INT])

$ engine = createTimeSeriesEngine(name="engine", windowSize=6, step=6, metrics=<sum(qty)>, dummyTable=trades, outputTable=output3, timeColumn=`time,keyColumn=`sym, forceTriggerTime=7,fill=1000)
$ subscribeTable(tableName="trades", actionName="engine", offset=0, handler=append!{engine}, msgAsTable=true)
$ sleep(1000)
$ insert into engine values(2018.08.01T14:05:43,`A,1)
$ insert into engine values(2018.08.01T14:05:43,`C,3)
$ sleep(10)
$ insert into engine values(2018.08.01T14:05:44,`B,1)
$ sleep(80)
$ insert into engine values(2018.08.01T14:05:52,`B,3)
$ sleep(20)
$ insert into engine values(2018.08.01T14:05:54,`A,3)
$ sleep(10)
$ insert into engine values(2018.08.01T14:05:55,`A,5)
$ sleep(20)
$ insert into engine values(2018.08.01T14:05:57,`B,5)
$ sleep(50)
$ insert into engine values(2018.08.01T14:06:12,`A,1)
$ sleep(50)
$ select * from output3 order by sym
time sum Qty
2018.08.01T14:05:46 A 1
2018.08.01T14:05:52 A 1,000
2018.08.01T14:05:58 A 8
2018.08.01T14:06:04 A 1,000
2018.08.01T14:06:10 A 1,000
2018.08.01T14:05:46 B 1
2018.08.01T14:05:52 B 1,000
2018.08.01T14:05:58 B 8
2018.08.01T14:05:46 C 3
2018.08.01T14:05:52 C 1,000