createAsofJoinEngine

Syntax

createAsofJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [delayedTime], [garbageSize], [sortByTime])

Details

Create an asof join streaming engine. Streams are ingested into the left table and the right table and joined on matchingColumn and timeColumn (or system time). For each record in the left table, join it with the right table record (1) with matching matchingColumn value and (2) whose timestamp is the last of the timestamps that are less than or equal to the timestamp of the left table record. This function returns a table object holding the asof join results.

Asof join engine joins records that have no exact match on time columns. For each timestamp in one table, the engine obtains the latest (i.e., current as of the timestamp) value from another table.

Note:

  • The records in the left table and the right table must be sequenced by time.

  • If delayedTime is not specified, a join operation is only triggered when the right table receives a record whose timestamp is greater than the timestamp of the latest record in the left table.

  • If delayedTime is specified, a join operation is triggered when either of the following conditions is met:

    • In the left table, the difference between the timestamp of the latest record and the timestamp of the previous unjoined record is greater than delayedTime.

    • The record is still not joined after 2 * delayedTime or 2 seconds, whichever is larger, since its ingestion into the left table.

For more application scenarios, see Streaming Engines.

Arguments

name is a string indicating the name of the asof join engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores and must start with a letter.

leftTable and rightTable are table objects whose schema must be the same as the stream table to which the engine subscribes.

outputTable is a table to which the engine inserts calculation result. It can be an in-memory table or a DFS table. Before calling a function, an empty table with specified column names must be created.

The columns of outputTable are in the following order:

  1. The first column must be a temporal column.

  • if useSystemTime = true, the data type must be TIMESTAMP;

  • if useSystemTime = false, it has the same data type as timeColumn.

  1. Then followed by one or more columns on which the tables are joined, arranged in the same order as specified in matchingColumn.

  2. Further followed by one or more columns which are the calculation results of metrics.

metrics is metacode (can be a tuple) specifying the calculation formulas. For more information about metacode, refer to Metaprogramming.

  • metrics can use one or more expressions, built-in or user-defined functions, but not aggregate functions.

  • metrics can be functions with multiple returns and the columns in the output table to hold the return values must be specified. For example, <func(price) as `col1`col2>.

  • To specify a column that exists in both the left and the right tables, use the format tableName.colName.

  • The column names specified in metrics are case-sensitive and must be consistent with the column names of the input tables.

matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.

1. When there is only 1 column to match - If the names of the columns to match are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it’s a tuple of two elements. For example, if the column is named “sym” in the left table and “sym1” in the right table, then matchingColumn = [[`sym],[`sym1]].

2. When there are multiple columns to match - If both tables share the names of all columns to match, matchingColumn is a STRING vector; otherwise it’s a tuple of two elements. For example, if the columns are named “timestamp” and “sym” in the left table, whereas in the right table they’re named “timestamp” and “sym1”, then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].

timeColumn is an optional parameter. When useSystemTime = false, it must be specified to indicate the name of the time column in the left table and the right table. The time columns must have the same data type. If the names of the time column in the left table and the right table are the same, timeColumn is a string. Otherwise, it is a vector of 2 strings indicating the time column in each table.

useSystemTime is an optional parameter indicating whether the left table and the right table are joined on the system time, instead of on timeColumn.

  • useSystemTime = true: join records based on the system time (timestamp with millisecond precision) when they are ingested into the engine.

  • useSystemTime = false (default): join records based on the specified timeColumn from the left table and the right table.

delayedTime is an optional parameter. It is a positive integer with the same precision as timeColumn, indicating the maximum time to wait before the engine joins an uncalculated record in the left table with a right table record. To specify delayedTime, timeColumn must be specified. For more information, see Details.

garbageSize is an optional parameter. It is a positive integer with the default value of 5,000 (rows). As the subscribed data is ingested into the engine, it continues to take up the memory. Within the left/right table, the records are grouped by matchingColumn values; When the number of records in a group exceeds garbageSize, the system will remove those already been calculated from memory.

sortByTime is a Boolean value that indicates whether the output data is globally sorted by time. The default value is false, meaning the output data is sorted only within groups. Note that if sortByTime is set to true, the parameter delayedTime cannot be specified, and the data input to the left and right tables must be globally sorted.

Examples

$ share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]) as quotes
$ prevailingQuotes=table(100:0, `time`sym`price`bid`ask`spread, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE])
$ ajEngine=createAsofJoinEngine(name="aj1", leftTable=trades, rightTable=quotes, outputTable=prevailingQuotes, metrics=<[price, bid, ask, abs(price-(bid+ask)/2)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false, delayedTime=1)
$ tmp1=table(2020.08.27T09:30:00.000+2 8 20 22 as time, take(`A, 4) as sym, 20.01 20.04 20.07 20.08 as price)
$ tmp2=table(2020.08.27T09:30:00.000+1 5 6 11 19 as time, take(`A, 5) as sym, 20 20.02 20.03 20.05 20.06 as bid,  20.01 20.03 20.04 20.06 20.07 as ask)
$ tmp1.sortBy!(`time)
$ tmp2.sortBy!(`time)
//can only use appendForJoin to append data
$ subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{ajEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=appendForJoin{ajEngine, false}, msgAsTable=true)

$ job1=submitJob("write1", "", append!{trades, tmp1})
$ job2=submitJob("write2", "", append!{quotes, tmp2})
$ getJobReturn(job1, true)
$ getJobReturn(job2, true)

$ sleep(1000)

$ select time, sym, bid from prevailingQuotes order by time, sym

time

sym

bid

2020.08.27T09:30:00.002

A

2020.08.27T09:30:00.008

A

2020.08.27T09:30:00.020

A

2020.08.27T09:30:00.022

A

20.06

$ unsubscribeTable(tableName="trades", actionName="joinLeft")
$ unsubscribeTable(tableName="quotes", actionName="joinRight")
$ undef(`trades,SHARED)
$ undef(`quotes,SHARED)
$ dropAggregator(name="aj1")

//define an asof join engine and set sortByTime=true
$ share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]) as quotes
$ prevailingQuotes=table(100:0, `time`sym`price`bid`ask`spread, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE])
$ ajEngine=createAsofJoinEngine(name="aj1", leftTable=trades, rightTable=quotes, outputTable=prevailingQuotes, metrics=<[price, bid, ask, abs(price-(bid+ask)/2)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false, sortByTime=true)

$ tmp1=table(2020.08.27T09:30:00.000+2 8 20 22 23 24 as time, take(`A`B, 6) as sym, 20.01 20.04 20.07 20.08 20.4 20.5 as price)
$ tmp2=table(2020.08.27T09:30:00.000+1 5 6 11 19 20 21 as time, take(`A`B, 7) as sym, 20 20.02 20.03 20.05 20.06 20.6 20.4 as bid,  20.01 20.03 20.04 20.06 20.07 20.5 20.6 as ask)
$ tmp1.sortBy!(`time)
$ tmp2.sortBy!(`time)

//only appendForJoin can be used to insert data
$ subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{ajEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=appendForJoin{ajEngine, false}, msgAsTable=true)

$ trades.append!(tmp1)
$ quotes.append!(tmp2)

$ sleep(100)

//check the output table
$ select time, sym, bid from prevailingQuotes

time                   sym   bid
2020.08.27T09:30:00.002      A       20
2020.08.27T09:30:00.008      B       20.02
2020.08.27T09:30:00.020      A       20.06