createWindowJoinEngine

Syntax

createWindowJoinEngine(name, leftTable, rightTable, outputTable, window, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [garbageSize = 5000], [maxDelayedTime])

Arguments

name is a required STRING indicating the name of the window join engine.

leftTable and rightTable are tables whose sole purpose is to provide the engine with schemas of the subscribed stream tables. It doesn’t matter if the tables contain data or not.

outputTable is a table to hold the calculation results. Before calling createWindowJoinEngine, set up an empty table as the outputTable and specify the column names and data types. The window join engine will write the calculation results into this table.

The columns of the outputTable are in the following order: the temporal column, the matching column, the columns corresponding to the calculation results. Specifically:

(1) The first column must be of temporal type.

  • If useSystemTime = true, the column must be of TIMESTAMP type.

  • If useSystemTime = false, the column must be of the same type as the timeColumn.

(2) The subsequent column(s) are the matching column(s), arranged in the same order as specified in matchingColumn.

(3) The last columns are calculation results. There can be more than 1 result column.

window is a pair of integers or duration values, indicating the range of the sliding window, including both left and right bounds.

metrics is a required parameter indicating calculation metrics in the form of metacode. It can use one or more expressions, and built-in or user-defined functions. You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2>. For details about metacode, see Metaprogramming

When only the columns in the rightTable are involved in calculation, the window join engine optimizes the performance of the following functions: sum, sum2, avg, std, var, corr, covar, wavg, wsum, beta, max, min, last, first, med, percentile。

matchingColumn indicates one or more columns based on which the tables are joined. It can be a scalar, vector or tuple of integral, temporal or literal (excluding UUID) types.

Specify matchingColumn following the rules below:

1. When there is only 1 matching column - If the names of the matching column are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it’s a tuple of two elements. For example, if the matching column is named “sym” in the left table and “sym1” in the right table, then matchingColumn = [[`sym],[`sym1]].

2. When there are multiple matching columns - If the names of the matching columns are all the same in both tables, specify matchingColumn as a STRING vector; otherwise it’s a tuple of two elements. For example, if the matching columns are named “timestamp” and “sym” in the left table, whereas in the right table they’re named “timestamp” and “sym1”, then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].

timeColumn is an optional parameter. When useSystemTime = false, specify this parameter to indicate the name of the temporal columns in the tables to be joined. The name of temporal columns in the leftTable and rightTable can be different, but the data type must be the same, in which case timeColumn is a STRING vector of 2 elements.

useSystemTime indicates whether the first column (i.e., the temporal column) of the outputTable uses the system time (useSystemTime = true) or the temporal column from the left table (useSystemTime = false). This parameter is optional.

  • useSystemTime = true, to calculate according to the timestamp when each record is ingested into the streaming engine (with millisecond precision) instead of timeColumn.

  • useSystemTime = false (default), to calculate according to timeColumn instead of the timestamp when each record is ingested into the engine.

garbageSize is an optional integer greater than 0. The default value is 5,000 (in units of bytes). As the subscribed data is ingested into the streaming engine, it occupies an increasing amount of memory. When the number of rows in memory exceeds garbageSize, the system will clear the data that is not needed for the current calculation.

maxDelayedTime is an optional integer greater than 0. Use this parameter to specify the waiting time before triggering a calculation for the grouped data in the engine that has not been output for a long time. The default value is 3 (in units of seconds). To specify maxDelayedTime, useSystemTime must be specifed as false.

Details

Create the window join streaming engine. Return a table that is the window join result of a left table and a right table. This function is usually used in conjunction with subscribeTable.

Note: Only function appendForJoingetLeftStream or getRightStream can be used to ingest data into the window join engine.

Calculation rules of the engine:

1. With data grouped based on the matchingColumn, the timestamp of each record from the left table determines a time window in the right table. The calculation of each window is triggered by the arrival of the first record after the current window ends.

2. If the current window is the last one in a group, then its calculation is triggered when the latest timestamp t1 in the right table satisfies (t1> t+right bound of the window + maxDelayedTime).

Note: Only the appendForJoin function can be used to insert data into the window join engine.

Examples

$ share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
$ share streamTable(1:0, `time`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable
$ output=table(100:0, `time`sym`factor1`factor2`factor3, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ wjEngine=createWindowJoinEngine(name="test1", leftTable=leftTable, rightTable=rightTable, outputTable=output,  window=-2:2, metrics=<[price,val,sum(val)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false,maxDelayedTime=4)

$ subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{wjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{wjEngine, false}, msgAsTable=true)

$ def writeData(begin, end, tablename, wjEngine){
$    n = end - begin
$    if(tablename == "leftTable"){
$       tp1=table(take(2012.01.01T00:00:00.000+begin..end, 2*n) as time, take(`AAPL, n) join take(`IBM, n) as sym, take(double(1..n),2*n) as price)
$       tp1.sortBy!(`time)
$       objByName(tablename).append!(tp1)
$       appendForJoin(wjEngine, true, tp1)
$    }else{
$       tp2=table(take(2012.01.01T00:00:00.000+begin..end, 2*n) as time, take(`AAPL, n) join take(`IBM, n) as sym, take(double(1..n),2*n) as val)
$       tp2.sortBy!(`time)
$       objByName(tablename).append!(tp2)
$       appendForJoin(wjEngine, false, tp2)
$       }
$ }

$ writeData(0, 10, "leftTable", wjEngine)
$ writeData(0, 10, "rightTable", wjEngine)

$ select * from output where time between 2012.01.01T00:00:00.006:2012.01.01T00:00:00.010

time

sym

factor1

factor2

factor3

2012.01.01T00:00:00.006

AAPL

7

5

35

2012.01.01T00:00:00.006

AAPL

7

6

35

2012.01.01T00:00:00.006

AAPL

7

7

35

2012.01.01T00:00:00.006

AAPL

7

8

35

2012.01.01T00:00:00.006

AAPL

7

9

35

2012.01.01T00:00:00.006

IBM

8

6

40

2012.01.01T00:00:00.006

IBM

8

7

40

2012.01.01T00:00:00.006

IBM

8

8

40

2012.01.01T00:00:00.006

IBM

8

9

40

2012.01.01T00:00:00.006

IBM

8

10

40

2012.01.01T00:00:00.007

IBM

9

7

34

2012.01.01T00:00:00.007

IBM

9

8

34

2012.01.01T00:00:00.007

IBM

9

9

34

2012.01.01T00:00:00.007

IBM

9

10

34