createEquiJoinEngine

Syntax

createEquiJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, timeColumn, [garbageSize=5000], [maxDelayedTime])

Alias: createEqualJoinEngine

Details

Create an equi join streaming engine. Streams are ingested into the engine through left and right tables and joined on matchingColumn and timeColumn. Return a table object that is the equi join result of a left table and a right table. The result holds all records with matching values.

For more application scenarios, see Streaming Engines.

Calculation Rules

When data is ingested into one table, the equi join streaming engine searches for records with matching values in the other table. If matches are found, the engine outputs the combined records with additional columns holding the calculation results of metrics.

Arguments

Some parameters of the equi join engine are the same as those of the asof join engine, please refer to the function createAsofJoinEngine for detailed information. The different parameters are described as below:

name is a string indicating the name of the equal join streaming engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.

timeColumn is a string or a vector of strings indicating the time columns in the left table and the right table. The time columns in the left and right tables must have the same data type. When the two time columns have the same column name, timeColumn is a string scalar; otherwise, timeColumn is vector of two strings.

garbageSize (optional) is a positive integer with the default value of 5,000 (in unit of rows). When the number of rows of historical data in memory exceeds the garbageSize, the system will remove the historical data that is not needed for the current calculation on the following conditions:

  • The historical data has already been joined and returned.

  • For historical data that has not been joined, if the timestamp difference between the historical data and the new arriving data in left/right table has exceeded the maxDelayedTime, it will also be discarded.

maxDelayedTime is an optional parameter. It is a positive integer with the default value of 3 (seconds), indicating the maximum time to keep cached data in the engine. This parameter only takes effect when the conditions described in garbageSize are met. It is not recommended to set the maxDelayedTime too small in case data got removed before it is joined.

Examples

Example 1

$ share streamTable(1:0, `time`sym`price, [SECOND, SYMBOL, DOUBLE]) as leftTable
$ share streamTable(1:0, `time`sym`val, [SECOND, SYMBOL, DOUBLE]) as rightTable
$ output=table(100:0, `time`sym`price`val`total, [SECOND, SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ ejEngine=createEquiJoinEngine("test1", leftTable, rightTable, output, [<price>, <val>, <price*val>], `sym, `time)
$ subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{ejEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{ejEngine, false}, msgAsTable=true)

$ tmp1=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(1..20) as price)
$ leftTable.append!(tmp1)
$ tmp2=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(50..31) as val)
$ rightTable.append!(tmp2)

$ select count(*) from output
20

Example 2. The type of the timeColumn is timestamp. The default value of maxDelayedTime is 3000ms (3s).

$ share streamTable(5000000:0, `timestamp`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as leftTable
$ share streamTable(5000000:0, `timestamp`sym`val, [TIMESTAMP, SYMBOL, DOUBLE]) as rightTable
$ share table(5000000:0, `timestamp`sym`price`val`total`diff`ratio, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as output
$ ejEngine=createEquiJoinEngine("test1", leftTable, rightTable, output, <[price, val, price+val, price-val, price/val]>, `sym, `timestamp, 5000)
$ topic1=subscribeTable(tableName="leftTable", actionName="writeLeft", offset=0, handler=appendForJoin{ejEngine, true}, batchSize=10000, throttle=1)
$ topic2=subscribeTable(tableName="rightTable", actionName="writeRight", offset=0, handler=appendForJoin{ejEngine, false}, batchSize=10000, throttle=1)
$ def writeLeftTable(mutable tb){
$    batch = 1000
$    for(i in 1..300){
$            tmp = table(batch:batch, `timestamp`sym`price, [TIMESTAMP, SYMBOL, DOUBLE])
$            tmp[`timestamp]=take(2012.01.01T00:00:00.000+i, batch)
$            tmp[`sym]=shuffle("A"+string(1..batch))
$            tmp[`price]=rand(100.0, batch)
$            tb.append!(tmp)
$    }
$ }

$ def writeRightTable(mutable tb){
$    batch = 500
$    for(i in 1..200){
$            tmp = table(batch:batch, `timestamp`sym`val, [TIMESTAMP, SYMBOL, DOUBLE])
$            tmp[`timestamp]=take(2012.01.01T00:00:00.000+i, batch)
$            tmp[`sym]=shuffle("A"+string(1..batch))
$            tmp[`val]=rand(100.0, batch)
$            tb.append!(tmp)
$    }
$ }

$ job1 = submitJob("writeLeft", "", writeLeftTable, leftTable)
$ job2 = submitJob("writeRight", "", writeRightTable, rightTable)

$ select count(*) from output order by sym, timestamp
$ 100000