createWindowJoinEngine

Syntax

createWindowJoinEngine(name, leftTable, rightTable, outputTable, window, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [garbageSize = 5000], [maxDelayedTime], [nullFill], [sortByTime])

Details

Create a window join streaming engine. Return a table object that is the real-time window join result of a left table and a right table. While the SQL window join only accepts aggregate functions as metrics, the window join streaming engine also supports non-aggregate functions.

Data ingested into the engine is grouped by matchingColumn. Within a group, for each record in the left table, calculate the metrics over the specified window in the right table and return the metrics in additional columns.

  • Standard windows (i.e., window = a:b):

The windows over the right table are determined by the current timestamp in the left table and the specified parameter window. Suppose the current timestamp in the left table is t, and window is set to a:b, then the corresponding window in the right table consists of records with timestamps in [t+a, t+b]. The engine returns the join result containing the results of the metrics calculated using the windowed data.

Window triggering rules:

1. A window is triggered when a timestamp (with the same matchingColumn value) past the end of that window arrives in the right table. The record itself does not participate in the calculation of that window.

2. If maxDelayedTime is specified - a new timestamp t (regardless of its matchingColumn value) in the right table triggers an uncalculated window when t > b + maxDelayedTime)

  • Special windows (i.e., window = 0:0, maxDelayedTime is not supported)

The windows over the right table are determined by the current timestamp in the left table and its previous timestamp. Suppose the current timestamp in the left table is t and the previous timestamp is t0, then the corresponding window in the right table consists of records with timestamps in [t0, t).

Window triggering rules:

  • When useSystemTime = false, a window is triggered when a timestamp past the end of that window arrives in the right table.

  • When useSystemTime = true, a window is triggered at the ingestion of each record in the left table.

Note: When window is set to 0:0, metrics does not support non-aggregate functions that are applied to columns from the right table.

For more application scenarios of streaming engines, see Streaming Engines.

Arguments

name is a string indicating the name of the window join streaming engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.

leftTable and rightTable are table objects whose schema must be the same as the stream table to which the engine subscribes.

outputTable is a table to which the engine inserts calculation result.

The columns of outputTable are in the following order:

(1) The first column must be of temporal type.

  • If useSystemTime = true, the column must be of TIMESTAMP type.

  • If useSystemTime = false, the column must be of the same type as the timeColumn.

(2) Then followed by the the column(s) on which the tables are joined, arranged in the same order as specified in matchingColumn.

(3) Further followed by the columns holding the calculation results. There can be more than 1 result column.

window is a pair of integers or duration values, indicating the range of a sliding window, including both left and right bounds.

metrics is metacode (which can be a tuple) specifying the calculation formulas. For more information about metacode, please refer to Metaprogramming

  • metrics can use one or more expressions, built-in or user-defined functions (both aggregate functions and non-aggregate functions are accepted).

  • metrics can be functions that return multiple values and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.

  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables. However, if there are multiple identical column names specified in metrics, their cases must be consistent.

If you want to specify a column that exists in both the left and the right tables, use the format tableName.colName to indicate which. By default, the column from the left table is used.

Performance will be optimized when the following functions are applied to and only to columns from the right table. sum, sum2, avg, std, var, corr, covar, wavg, wsum, beta, max, min, last, first, med, percentile.

matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.

1. When there is only 1 column to match - If the names of the matching column are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it’s a tuple of two elements. For example, if the column is named “sym” in the left table and “sym1” in the right table, then matchingColumn = [[`sym],[`sym1]].

2. When there are multiple columns to match - If the names of all the columns to match are the same in both tables, matchingColumn is a STRING vector; otherwise it’s a tuple of two elements. For example, if the columns are named “timestamp” and “sym” in the left table, whereas in the right table they’re named “timestamp” and “sym1”, then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].

timeColumn is an optional parameter. When useSystemTime = false, it must be specified to indicate the name(s) of the time column in the left table and the right table. The time columns must have the same data type. If the names of the time column in the left table and the right table are the same, timeColumn is a string. Otherwise, it is a vector of 2 strings indicating the time column in each table.

useSystemTime is an optional parameter indicating whether the left table and the right table are joined on the system time, instead of on the timeColumn.

  • useSystemTime = true: join records based on the system time (timestamp with millisecond precision) when they are ingested into the engine.

  • useSystemTime = false (default): join records based on the specified timeColumn from the left table and the right table.

garbageSize is an optional parameter. It is a positive integer with the default value of 5,000 (rows). As the subscribed data is ingested into the engine, it continues to take up the memory. Within the left/right table, the records are grouped by matchingColumn values; When the number of records in a group exceeds garbageSize, the system will remove those already been calculated from memory.

maxDelayedTime is an optional parameter. It is a positive integer with the default value of 3 (seconds). Use maxDelayedTime to trigger windows which remain uncalculated long past its end. maxDelayedTime only takes effect when timeColumn is specified and the two arguments must have the same time precision. For more information about this parameter, see “Window triggering rules” in the Details section.

nullFill is a tuple of the same size as the number of output columns. The data type of each element corresponds to each output column. It is used to fill in the NULL values and missing values in the output table. except

Note that the following values are not filled:

  • existing NULL values from right table columns

  • results of non-aggregate calculation

sortByTime is a Boolean value that indicates whether the output data is globally sorted by time. The default value is false, meaning the output data is sorted only within groups. Note that if sortByTime is set to true, the parameter maxDelayedTime cannot be specified, and the data input to the left and right tables must be globally sorted.

Examples

$ share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
$ share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable
$ output=table(100:0, `time`sym`factor1`factor2`factor3, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE])

$ nullFill= [2012.01.01T00:00:00.000, `NONE, 0.0, 0.0, 0.0]
$ wjEngine=createWindowJoinEngine(name="test1", leftTable=leftTable, rightTable=rightTable, outputTable=output,  window=-2:2, metrics=<[price,val,sum(val)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false,nullFill=nullFill)

$ subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{wjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{wjEngine, false}, msgAsTable=true)

$ n=10
$ tp1=table(take(2012.01.01T00:00:00.000+0..10, 2*n) as time, take(`AAPL, n) join take(`IBM, n) as sym, take(NULL join rand(10.0, n-1),2*n) as price)
$ tp1.sortBy!(`time)
$ leftTable.append!(tp1)

$ tp2=table(take(2012.01.01T00:00:00.000+0..10, 2*n) as time, take(`AAPL, n) join take(`IBM, n) as sym, take(double(1..n),2*n) as val)
$ tp2.sortBy!(`time)
$ rightTable.append!(tp2)

$ select * from output where time between 2012.01.01T00:00:00.000:2012.01.01T00:00:00.001

time

sym

factor1

factor2

factor3

2012.01.01T00:00:00.000

AAPL

0

1

6

2012.01.01T00:00:00.000

AAPL

0

2

6

2012.01.01T00:00:00.000

AAPL

0

3

6

2012.01.01T00:00:00.001

AAPL

5.2705

1

10

2012.01.01T00:00:00.001

AAPL

5.2705

2

10

2012.01.01T00:00:00.001

AAPL

5.2705

3

10

2012.01.01T00:00:00.001

AAPL

5.2705

4

10

2012.01.01T00:00:00.000

IBM

5.2705

2

9

2012.01.01T00:00:00.000

IBM

5.2705

3

9

2012.01.01T00:00:00.000

IBM

5.2705

4

9

2012.01.01T00:00:00.001

IBM

1.0179

2

14

2012.01.01T00:00:00.001

IBM

1.0179

3

14

2012.01.01T00:00:00.001

IBM

1.0179

4

14

2012.01.01T00:00:00.001

IBM

1.0179

5

14

Example for window = 0:0:

$ share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
$ share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable

$ v = [1, 5, 10, 15]
$ tp1=table(2012.01.01T00:00:00.000+v as time, take(`AAPL, 4) as sym, rand(10.0,4) as price)

$ v = [1, 2, 3, 4, 5, 6, 9, 15]
$ tp2=table(2012.01.01T00:00:00.000+v as time, take(`AAPL, 8) as sym, rand(10.0,8) as val)

$ output=table(100:0, `time`sym`price`sum_val, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
$ wjEngine=createWindowJoinEngine(name="test1", leftTable=leftTable, rightTable=rightTable, outputTable=output,  window=0:0, metrics=<[price, sum(val)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false)

$ subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{wjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{wjEngine, false}, msgAsTable=true)

$ leftTable.append!(tp1)
$ rightTable.append!(tp2)

time

sym

price

sum_val

2012.01.01T00:00:00.001

AAPL

8.8252

2012.01.01T00:00:00.005

AAPL

7.1195

21.3741

2012.01.01T00:00:00.010

AAPL

5.2217

16.4223

2012.01.01T00:00:00.015

AAPL

9.2517

The following example shows that when sortByTime =true, the engine outputs data sorted by time.

$ unsubscribeTable(tableName="leftTable", actionName="joinLeft")
$ unsubscribeTable(tableName="rightTable", actionName="joinRight")
$ undef(`leftTable,SHARED)
$ undef(`rightTable,SHARED)
$ dropAggregator(name="test1")

//define a window join engine
$ share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
$ share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable
$ output=table(100:0, `time`sym`factor1`factor2`factor3, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ nullFill= [2012.01.01T00:00:00.000, `NONE, 0.0, 0.0, 0.0]
$ wjEngine=createWindowJoinEngine(name="test1", leftTable=leftTable, rightTable=rightTable, outputTable=output,  window=-2:2, metrics=<[price,val,sum(val)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false,nullFill=nullFill, sortByTime=true)

//subscribe data
$ subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{wjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{wjEngine, false}, msgAsTable=true)

$ n=10
$ tp1=table(take(2012.01.01T00:00:00.000+0..10, 2*n) as time, take(`A, n) join take(`B, n) as sym, take(NULL join rand(10.0, n-1),2*n) as price)
$ tp1.sortBy!(`time)
$ leftTable.append!(tp1)

$ tp2=table(take(2012.01.01T00:00:00.000+0..10, 2*n) as time, take(`A, n) join take(`B, n) as sym, take(double(1..n),2*n) as val)
$ tp2.sortBy!(`time)
$ rightTable.append!(tp2)

$ sleep(100)
$ select * from output where time between 2012.01.01T00:00:00.000:2012.01.01T00:00:00.001


time                    sym   factor1        factor2 factor3
2012.01.01T00:00:00.000      A       0        1        6
2012.01.01T00:00:00.000      A          0        2        6
2012.01.01T00:00:00.000      A       0        3        6
2012.01.01T00:00:00.000      B     3.9389    2        9
2012.01.01T00:00:00.000      B     3.9389    3        9
2012.01.01T00:00:00.000      B     3.9389    4        9
2012.01.01T00:00:00.001      A     3.9389    1        10
2012.01.01T00:00:00.001      A       3.9389  2        10
2012.01.01T00:00:00.001      A       3.9389  3        10
2012.01.01T00:00:00.001      A       3.9389  4        10
2012.01.01T00:00:00.001      B     4.9875    2        14
2012.01.01T00:00:00.001      B     4.9875    3        14
2012.01.01T00:00:00.001      B     4.9875    4        14
2012.01.01T00:00:00.001      B     4.9875    5        14