createAsofJoinEngine

Syntax

createAsofJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [timeColumn], [useSystemTime=false], [delayedTime], [garbageSize])

Arguments

name is a string indicating the name of the asof join engine.

leftTable and rightTable are table objects whose schema must be the same as the subscribed stream table.

outputTable is a table object indicating the output table for the results. We need to set the output table as an empty table and specify the names and data types of the columns before using function createAsofJoinEngine. By default, the first two columns are the timeColumn and matchingColumn of the leftTable which do not need to be specified in metrics.

metrics is metacode specifying the calculation formulas. It can use one or more expressions, built-in or user-defined functions, but not aggregate functions. You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2>. For more information about metacode please refer to Metaprogramming.

matchingColumn is a STRING scaler/vector/tuple indicating the column(s) on which the tables are joined. It supports integral, temporal or literal (except UUID) types.

1. When there is only 1 matching column - If the names of the matching column are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it’s a tuple of two elements. For example, if the matching column is named “sym“ in the left table and “sym1“ in the right table, then matchingColumn = [[`sym],[`sym1]].

2. When there are multiple matching columns - If the names of the matching columns are all the same in both tables, specify matchingColumn as a STRING vector; otherwise it’s a tuple of two elements. For example, if the matching columns are named “timestamp“ and “sym“ in the left table and “timestamp“ and “sym1“ in the right table, then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].

timeColumn is an optional parameter indicating the name of the time column in the two input tables when useSystemTime = false. If the time column of the left table is the same as that of the right table, timeColumn can be a string scalar. Otherwise, specify a string vector of length 2 (the time column names in the two tables).

useSystemTime is an optional parameter indicating whether the first column of outputTable is system time ( useSystemTime = true) or timeColumn in the stream table ( useSystemTime = false).

  • useSystemTime = true, to calculate according to the timestamp when each record is ingested into the streaming engine (with millisecond precision) instead of timeColumn.

  • useSystemTime = false (default), to calculate according to timeColumn instead of the timestamp when each record is ingested into the engine. Please note that the the data which triggers the calculation doesn’t participate in this calculation.

useSessionStartTime is a Boolean scalar. It is optional and indicates whether the time in the output table is the start time of data window. The default value is false, which means that the time in the output table is the end time of the data window, that is, the timestamp of the last data in each window + sessionGap. It must be set to true if parameter updateTime is specified.

delayedTime is a positive integer. We need to set parameter useSystemTime to false before specifying delayedTime.

When delayedTime is specified, if the time difference between the timestamp of the latest record t2 in the left table and the timestamp of uncalculated records is greater than the delayedTime, a calculation will be triggered by the record t2. If max(2*updateTime, 2 seconds) after a record of leftTable arrives at the aggregator, the arriving record still has not participated in a calculation, a calculation will be triggered too.

garbageSize is a positive integer. It is optional and the default value is 5,000. As the subscribed data is ingested into the streaming engine, it occupies an increasing amount of memory. When the number of rows in memory exceeds garbageSize, the system will clear the data that is not needed for the current calculation.

Details

Create asof join engine for streaming data. Return a table that is the asof join result of a left table and a right table. Generally it is used in conjunction with subscribeTable.

  • If delayedTime is not specified, then a calculation is triggered when the timestamp of the latest record in the right table is later than the timestamp of the latest record in the left table.

  • If delayedTime is specified, then a calculation is trigger when either of the following 2 conditions is met:

    • the time difference between the latest time in left table and the time of uncalculated data is greater than the delayedTime.

    • max(2*updateTime, 2 seconds) after a record is ingested into the left table, it still has not been used in a calculation.

Starting from version 1.30.17, 2.00.5, DolphinDB supports other stream engines to ingest data into streaming join engines to realize cascade between engines through getLeftStream and getRightStream.

Examples

$ share streamTable(1:0, `time`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1:0, `time`sym`bid`ask, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]) as quotes
$ prevailingQuotes=table(100:0, `time`sym`price`bid`ask`spread, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE])
$ ajEngine=createAsofJoinEngine(name="aj1", leftTable=trades, rightTable=quotes, outputTable=prevailingQuotes, metrics=<[price, bid, ask, abs(price-(bid+ask)/2)]>, matchingColumn=`sym, timeColumn=`time, useSystemTime=false, delayedTime=1)
$ tmp1=table(2020.08.27T09:30:00.000+2 8 20 22 as time, take(`A, 4) as sym, 20.01 20.04 20.07 20.08 as price)
$ tmp2=table(2020.08.27T09:30:00.000+1 5 6 11 19 as time, take(`A, 5) as sym, 20 20.02 20.03 20.05 20.06 as bid,  20.01 20.03 20.04 20.06 20.07 as ask)
$ tmp1.sortBy!(`time)
$ tmp2.sortBy!(`time)
$ subscribeTable(tableName="trades", actionName="joinLeft", offset=0, handler=appendForJoin{ajEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="quotes", actionName="joinRight", offset=0, handler=appendForJoin{ajEngine, false}, msgAsTable=true)

$ job1=submitJob("write1", "", append!{trades, tmp1})
$ job2=submitJob("write2", "", append!{quotes, tmp2})
$ getJobReturn(job1, true)
$ getJobReturn(job2, true)

$ sleep(1000)

$ select time, sym, price from prevailingQuotes order by time, sym

time

sym

price

2020.08.27T09:30:00.002

A

20.01

2020.08.27T09:30:00.008

A

20.04

2020.08.27T09:30:00.020

A

20.07

2020.08.27T09:30:00.022

A

20.08