createDailyTimeSeriesEngine

Syntax

createDailyTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill=’none’], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup], [forceTriggerSessionEndTime])

Details

This function creates a daily time-series streaming engine. The windowing logic and calculation rules of the daily time-series engine are similar to those of the time-series engine, but with some added features:

  • Window calculations are performed only within a specified time period (known as a “session”) of a calendar day. A day can have multiple sessions, such as 9:00-12:00, 13:00-15:00, and so on.

  • Data that arrives before the start of a session within a calendar day will be included in the calculation of the first window of that session.

  • Data that arrives after the end of the last session of that day will be discarded.

Note: If keyColumn is specified to group data by the column values, the calculations described above will be performed within each group.

For more application scenarios, see Streaming Engines.

Arguments

The daily time-series engine is an extension of the time-series engine (createTimeSeriesEngine) and inherits all of its parameters. In this section, we will only cover the parameters specific to this engine.

sessionBegin (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the starting time of each session. If it is a vector, it must be increasing.

sessionEnd (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the end time of each session. Specify sessionEnd as 00:00:00 to indicate the beginning of the next day (i.e., 24:00:00 of the current day).

mergeSessionEnd (optional) is a Boolean value. This parameter is only applicable when closed = ‘left’. When closed = ‘left’, mergeSessionEnd determines whether the record arriving at the end of a session (which has been adjusted based on the alignment rules) will be included in the calculation of the last window of that session. The default value is false, which means the record will not be included in the last window but will trigger its calculation. If the current session is not the last session of the day, the record will participate in the calculation of the first window of the next session.

forceTriggerSessionEndTime (optional) is a positive integer. The unit of forceTriggerSessionEndTime is consistent with the precision of timeColumn. It indicates the waiting time to force trigger calculation in the window containing the sessionEnd, if it ends without calculation.

If no data is ingested into a group after the last window is calculated, and new data continues to ingest into other groups, the specified fill parameter can be used to fill the results of empty windows of that group. This ensures that the group’s windows will still be output at the latest time point. If parameter fill is not specified, no new windows will be generated for that group after the calculation of the last window.

Note:

  • The engine automatically adjusts the starting point of the window for sessionBegin and sessionEnd according to the value of alignmentSize.

Examples

$ share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
$ output1 = keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT])
$ engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=2, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

$ insert into trades values(2018.10.08,09:25:31,`A,8)
$ insert into trades values(2018.10.08,09:26:01,`B,10)
$ insert into trades values(2018.10.08,09:30:02,`A,26)
$ insert into trades values(2018.10.08,09:30:10,`B,14)
$ insert into trades values(2018.10.08,11:29:46,`A,30)
$ insert into trades values(2018.10.08,11:29:50,`B,11)
$ insert into trades values(2018.10.08,11:30:00,`A,14)
$ insert into trades values(2018.10.08,11:30:00,`B,4)
$ insert into trades values(2018.10.08,13:00:10,`A,16)
$ insert into trades values(2018.10.08,13:00:12,`B,9)
$ insert into trades values(2018.10.08,14:59:56,`A,20)
$ insert into trades values(2018.10.08,14:59:58,`B,20)
$ insert into trades values(2018.10.08,15:00:00,`A,10)
$ insert into trades values(2018.10.08,15:00:00,`B,29)

$ sleep(1000)
$ select * from output1

time

sym

sumVolume

2018.10.08T09:31:00

A

34

2018.10.08T09:31:00

B

24

2018.10.08T11:30:00

A

44

2018.10.08T11:30:00

B

15

2018.10.08T13:01:00

A

16

2018.10.08T13:01:00

B

9

2018.10.08T15:00:00

A

30

2018.10.08T15:00:00

B

49

$ share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
$ output1 = keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT])
$ engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true,forceTriggerSessionEndTime=10)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

$ insert into trades values(date(now()),09:25:31,`A,8)
$ insert into trades values(date(now()),09:26:01,`B,10)
$ insert into trades values(date(now()),09:30:02,`A,26)
$ insert into trades values(date(now()),09:30:10,`B,14)
$ insert into trades values(date(now()),11:29:46,`A,30)
$ insert into trades values(date(now()),11:29:50,`B,11)
$ insert into trades values(date(now()),11:30:00,`B,14)
$ insert into trades values(date(now()),11:30:01,`A,4)

$ select * from output1

time

sym

sumVolume

2022.03.24T09:31:00

A

34

2022.03.24T09:31:00

B

24

2022.03.24T11:30:00

A

30

Set forceTriggerSessionEndTime = 10. Calculation on the window with the right boundary at 11:30:00 will be triggered 10 seconds after the system time reaches 11:30:00.

$ sleep(10000)
$ select * from output1

time

sym

sumVolume

2022.03.24T09:31:00

A

34

2022.03.24T09:31:00

B

24

2022.03.24T11:30:00

A

30

2022.03.03T11:30:00

B

25