createSessionWindowEngine

Syntax

createSessionWindowEngine(name, sessionGap, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup])

Arguments

As most of the parameters of createSessionWindowEngine are identical with those of createTimeSeriesEngine we only explain the following parameters of createSessionWindowEngine that are different or have different meanings from those of createTimeSeriesAggregator:

sessionGap a positive integer indicating the gap between 2 session windows. The unit of it is determined by the parameter useSystemTime.

useSystemTime: a Boolean value indicating how the session windows are determined.

  • if useSystemTime=true (default), the windows are determined by the timestamp when each record is ingested into the streaming engine (with millisecond precision) instead of timeColumn.

  • if useSystemTime=false, the windows are determined by timeColumn instead of when they are ingested into the engine.

useSessionStartTime: is a Boolean value indicating whether the first column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the ending time of the windows.

Details

Create a session window streaming engine.

It is very similar to the time-series streaming engine. The only difference among the two streaming engines is how the calculation windows are determined. In the time-series streaming engine, fixed-length windows are generated at specified frequencies. In comparison, in the session window streaming engine, if no new message is ingested for the specified length of time after a message, then this message is the end of a window. The next message is the start of a new window.

Examples

$ share streamTable(1000:0, `time`volume, [TIMESTAMP, INT]) as trades
$ output1 = keyedTable(`time,10000:0, `time`sumVolume, [TIMESTAMP, INT])
$ engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time)
$ subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

$ n = 5
$ timev = 2018.10.12T10:01:00.000 + (1..n)
$ volumev = (1..n)%1000
$ insert into trades values(timev, volumev)

$ n = 5
$ timev = 2018.10.12T10:01:00.010 + (1..n)
$ volumev = (1..n)%1000
$ insert into trades values(timev, volumev)

$ n = 3
$ timev = 2018.10.12T10:01:00.020 + (1..n)
$ volumev = (1..n)%1000
$ timev.append!(2018.10.12T10:01:00.027 + (1..n))
$ volumev.append!((1..n)%1000)
$ insert into trades values(timev, volumev)

$ select * from trades;

time

volume

2018.10.12T10:01:00.001

1

2018.10.12T10:01:00.002

2

2018.10.12T10:01:00.003

3

2018.10.12T10:01:00.004

4

2018.10.12T10:01:00.005

5

2018.10.12T10:01:00.011

1

2018.10.12T10:01:00.012

2

2018.10.12T10:01:00.013

3

2018.10.12T10:01:00.014

4

2018.10.12T10:01:00.015

5

2018.10.12T10:01:00.021

1

2018.10.12T10:01:00.022

2

2018.10.12T10:01:00.023

3

2018.10.12T10:01:00.028

1

2018.10.12T10:01:00.029

2

2018.10.12T10:01:00.030

3

select * from output1;

time

sumVolume

2018.10.12T10:01:00.001

15

2018.10.12T10:01:00.011

15

2018.10.12T10:01:00.021

6