createDailyTimeSeriesEngine
Syntax
createDailyTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill=’none’], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup], [forceTriggerSessionEndTime])
Arguments
This engine is an extension of function createTimeSeriesEngine and inherits all of its parameters. Here we only describe the new parameters:
sessionBegin is an optional parameter. It can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the starting time of each time period. If it is a vector, it must be increasing.
sessionEnd is an optional parameter. It can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the end time of each time period.
mergeSessionEnd is a Boolean scalar indicating whether to include the data at the ending time of the session in the calculation of each session. It is optional and the default value is false.
forceTriggerSessionEndTime is an optional parameter. It is a positive integer whose unit is consistent with the precision of timeColumn. It indicates the waiting time to force trigger calculation in the window containing the sessionEnd, if it ends without calculation.
If fill is not specified, the window returns no result.
If fill is specified, the result of the window will be filled using the specified filling method.
Note:
The engine automatically adjusts the starting point of the window for sessionBegin and sessionEnd according to the value of alignmentSize.
To set these three parameters, useSystemTime must be set to false. mergeSessionEnd can be set only when updateTime or forceTriggerTime is specified.
Details
Create a daily time series engine for streaming data.
Specify sessions (determined by sessionBegin and sessionEnd) in a day to perform aggregation calculation with streaming data. All data before the start of the first session is included in the calculation of the first window. The parameter mergeSessionEnd determines whether to calculate with the data at each session end.
Examples
$ share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
$ output1 = keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT])
$ engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=2, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
$ insert into trades values(2018.10.08,09:25:31,`A,8)
$ insert into trades values(2018.10.08,09:26:01,`B,10)
$ insert into trades values(2018.10.08,09:30:02,`A,26)
$ insert into trades values(2018.10.08,09:30:10,`B,14)
$ insert into trades values(2018.10.08,11:29:46,`A,30)
$ insert into trades values(2018.10.08,11:29:50,`B,11)
$ insert into trades values(2018.10.08,11:30:00,`A,14)
$ insert into trades values(2018.10.08,11:30:00,`B,4)
$ insert into trades values(2018.10.08,13:00:10,`A,16)
$ insert into trades values(2018.10.08,13:00:12,`B,9)
$ insert into trades values(2018.10.08,14:59:56,`A,20)
$ insert into trades values(2018.10.08,14:59:58,`B,20)
$ insert into trades values(2018.10.08,15:00:00,`A,10)
$ insert into trades values(2018.10.08,15:00:00,`B,29)
$ sleep(1000)
$ select * from output1
time |
sym |
sumVolume |
---|---|---|
2018.10.08T09:31:00 |
A |
34 |
2018.10.08T09:31:00 |
B |
24 |
2018.10.08T11:30:00 |
A |
44 |
2018.10.08T11:30:00 |
B |
15 |
2018.10.08T13:01:00 |
A |
16 |
2018.10.08T13:01:00 |
B |
9 |
2018.10.08T15:00:00 |
A |
30 |
2018.10.08T15:00:00 |
B |
49 |
$ share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
$ output1 = keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT])
$ engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true,forceTriggerSessionEndTime=10)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
$ insert into trades values(date(now()),09:25:31,`A,8)
$ insert into trades values(date(now()),09:26:01,`B,10)
$ insert into trades values(date(now()),09:30:02,`A,26)
$ insert into trades values(date(now()),09:30:10,`B,14)
$ insert into trades values(date(now()),11:29:46,`A,30)
$ insert into trades values(date(now()),11:29:50,`B,11)
$ insert into trades values(date(now()),11:30:00,`B,14)
$ insert into trades values(date(now()),11:30:01,`A,4)
$ select * from output1
time |
sym |
sumVolume |
---|---|---|
2022.03.24T09:31:00 |
A |
34 |
2022.03.24T09:31:00 |
B |
24 |
2022.03.24T11:30:00 |
A |
30 |
Set forceTriggerSessionEndTime = 10. Calculation on the window with the right boundary at 11:30:00 will be triggered 10 seconds after the system time reaches 11:30:00.
$ sleep(10000)
$ select * from output1
time |
sym |
sumVolume |
---|---|---|
2022.03.24T09:31:00 |
A |
34 |
2022.03.24T09:31:00 |
B |
24 |
2022.03.24T11:30:00 |
A |
30 |
2022.03.03T11:30:00 |
B |
25 |