createEqualJoinEngine

Syntax

createEqualJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, timeColumn, [garbageSize=5000], [maxDelayedTime])

Arguments

Some parameters of the equal join engine are the same as those of the asof join engine, please refer to the function createAsofJoinEngine for detailed information. The different parameters are described as below:

name is a required parameter. It is a string indicating the name of the equal join stream engine.

garbageSize is a positive integer. It is optional and the default value is 5000. When the row number of historical records in memory exceeds the garbageSize, the system will clear the historical data that is not needed for the current calculation.

maxDelayedTime is a positive integer. It is optional and the default value is 3s. The maximum delay time for the data to be injected into the equal join engine and be calculated. Data arriving after the maxDelayedTime delay time will be discarded. It is not recommended to set the maxDelayedTime too small.

When the number of rows of historical data in memory exceeds garbageSize, the data will be cleaned up according to the following rules:

1. Historical data that has been consumed by the equal join engine.

2. If the Timestamp difference between the historical data and the new arriving data in left/right table has exceeded the maxDelayedTime, the historical data will be discard.

Details

Create an equal join engine and return a table object after equal join of the left and right table. Generally it is used in conjunction with subscribeTable.

  • Both the left table and the right table are sorted in chronological order according to timeColumn, and the key value of the combination of the timeColumn and the matchingColumn returns a unique result.

Examples

Example 1

$ leftTable=keyedTable(`time`sym, 1:0, `time`sym`price, [SECOND, SYMBOL, DOUBLE])
$ rightTable=keyedTable(`time`sym, 1:0, `time`sym`val, [SECOND, SYMBOL, DOUBLE])
$ output=table(100:0, `time`sym`total, [SECOND, SYMBOL, DOUBLE])
$ ejEngine=createEqualJoinEngine("test1", leftTable, rightTable, output, <price+val>, `sym, `time)
$ tmp1=table(take(13:30:10, 20) as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(1..20) as price)
$ appendForJoin(ejEngine, true, tmp1)
$ leftTable.append!(tmp1)
$ tmp2=table(take(13:30:10, 20) as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(50..31) as val)
$ appendForJoin(ejEngine, false, tmp2)
$ rightTable.append!(tmp2)

$ re = select * from output order by time, sym
$ expected = select time, sym, price+val from ej(leftTable, tmp2, `sym`time) order by time, sym
$ each(eqObj, re.values(), expected.values())

Example 2. The type of the timeColumn is timestamp. The default value of maxDelayedTime is 3000ms (3s).

$ share streamTable(5000000:0, `timestamp`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
$ share streamTable(5000000:0, `timestamp`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable
$ share table(5000000:0, `timestamp`sym`price`val`total`diff`ratio, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as output
$ ejEngine=createEqualJoinEngine("test1", leftTable, rightTable, output, <[price, val, price+val, price-val, price/val]>, `sym, `timestamp, 5000)
$ topic1=subscribeTable(tableName="leftTable", actionName="writeLeft", offset=0, handler=appendForJoin{ejEngine, true}, batchSize=10000, throttle=1)
$ topic2=subscribeTable(tableName="rightTable", actionName="writeRight", offset=0, handler=appendForJoin{ejEngine, false}, batchSize=10000, throttle=1)
$ def writeLeftTable(mutable tb){
$    batch = 1000
$    for(i in 1..300){
$            tmp = table(batch:batch, `timestamp`sym`price, [TIMESTAMP, SYMBOL, DOUBLE])
$            tmp[`timestamp]=take(2012.01.01T00:00:00.000+i, batch)
$            tmp[`sym]=shuffle("A"+string(1..batch))
$            tmp[`price]=rand(100.0, batch)
$            tb.append!(tmp)
$    }
$ }

$ def writeRightTable(mutable tb){
$    batch = 500
$    for(i in 1..200){
$            tmp = table(batch:batch, `timestamp`sym`val, [TIMESTAMP, SYMBOL, DOUBLE])
$            tmp[`timestamp]=take(2012.01.01T00:00:00.000+i, batch)
$            tmp[`sym]=shuffle("A"+string(1..batch))
$            tmp[`val]=rand(100.0, batch)
$            tb.append!(tmp)
$    }
$ }

$ job1 = submitJob("writeLeft", "", writeLeftTable, leftTable)
$ job2 = submitJob("writeRight", "", writeRightTable, rightTable)

$ select count(*) from output order by sym, timestamp
$ 100000