createLookupJoinEngine

Syntax

createLookupJoinEngine(name, leftTable, rightTable, outputTable, metrics, matchingColumn, [rightTimeColumn], [checkTimes])

Details

Create a lookup join streaming engine to perform left join on stream tables. It conducts a real-time left join on two stream tables, or on a stream table and a non-stream table (where the non-stream table needs to be refreshed regularly).

Note:

1. A left join is triggered only when new data is ingested to the left table.

2. Data in the right table is grouped based on matchingColumn and only the latest record in each group is kept.

  • If the right table is a subscribed stream table, the records will be updated when new data is ingested to the right table.

  • If the right table is an in-memory table or a DFS table (currently only dimension table is supported), the system refreshes the right table at specified checkTimes intervals.

Starting from version 1.30.17, 2.00.5, DolphinDB supports using streaming engines to ingest data into streaming join engines to realize cascade of engines with getLeftStream and getRightStream.

Arguments

name is a STRING indicating the name of the lookup join streaming engine.

leftTable is a table object. Its schema must be the same as the subscribed stream table.

rightTable is a table object. It is an empty in-memory table, stream table or dimension table. Note that if the rightTable is not subscribed, checkTimes must be specified for timed data refreshing.

outputTable is a required table object to hold the calculation results. Before using function createLookupJoinEngine, set up an empty table as the outputTable, and specify the column names and data types.

The columns of the outputTable are in the following order: the matching columns, the result columns. Specifically:

(1) The first few columns must be arranged in the same order as specified in matchingColumn.

(2) The last few columns are the results of the metrics.

metrics is metacode specifying the calculation formulas. It can use one or more expressions, built-in or user-defined functions, but not aggregate functions. You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2>. For more information about metacode, please refer to Metaprogramming.

matchingColumn is a STRING scalar/vector, or a tuple of STRING vectors indicating the matching column(s). It can be integral, temporal or literal (excluding UUID) types.

Specify matchingColumn following the rules below:

1. When there is only 1 matching column - If the names of the matching column are the same in both tables, matchingColumn should be specified as a STRING scalar; otherwise it’s a tuple of two elements. For example, if the matching column is named “sym” in the left table and “sym1” in the right table, then matchingColumn = [[`sym],[`sym1]].

2. When there are multiple matching columns - If the names of the matching columns are all the same in both tables, specify matchingColumn as a STRING vector; otherwise it’s a tuple of two elements. For example, if the matching columns are named “timestamp” and “sym” in the left table, whereas in the right table they’re named “timestamp” and “sym1”, then matchingColumn = [[`timestamp, `sym], [`timestamp,`sym1]].

rightTimeColumn is a STRING scalar, indicating the time column in the right table. If the parameter is specified, the right table will keep the record with the latest timestamp. If there are multiple records with identical timestamp, only the latest is retained. If the parameter is not specified, the latest ingested record based on the system time will be kept.

checkTimes is a temporal vector or DURATION scalar. If it is specified, the system will regularly update the data set in the right table (with only the latest record kept).

  • If checkTimes is a temporal vector, it is of SECOND, TIME or NANOTIME type. The lookup join engine updates the right table on a daily basis according to the time specified by each element in the vector.

  • If checkTimes is a DURATION scalar, it indicates the interval to update the right table.

First Release

1.30.16/2.00.4

Examples

Ex. 1

$ login(`admin, `123456)
$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])

$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n  = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output

sym

factor1

factor2

factor3

A

10.1

13

131.3

B

11.1

14

155.4

C

12.1

15

181.5

A

13.1

13

170.3

B

14.1

14

197.4

C

15.1

15

226.5

A

16.1

13

209.3

B

17.1

14

239.4

C

8.1

15

271.5

A

19.1

13

248.3

Ex. 2

$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ share streamTable(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as prices
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)
$ subscribeTable(tableName="prices", actionName="append_rightTable", offset=0, handler=appendForJoin{LjEngine, false}, msgAsTable=true)

$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n  = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output

sym

factor1

factor2

factor3

A

10.1

10

101

B

11.1

11

122.1

C

12.1

12

145.2

A

13.1

10

131

B

14.1

11

155.1

C

15.1

12

181.2

A

16.1

10

161

B

17.1

11

188.1

C

18.1

12

217.2

A

19.1

10

191

Ex. 3: The right table is an in-memory table, so checkTimes must be set.

$ share streamTable(1000:0, `timestamps`sym`price, [TIMESTAMP, SYMBOL, DOUBLE]) as trades
$ output = table(100:0, `sym`factor1`factor2`factor3, [SYMBOL, DOUBLE, DOUBLE, DOUBLE])
$ prices=table(1000:0, `timestamps`sym`val`id, [TIMESTAMP, SYMBOL, DOUBLE, INT])
$ LjEngine = createLookupJoinEngine(name="test1", leftTable=trades, rightTable=prices, outputTable=output, metrics=<[price,val,price*val]>, matchingColumn=`sym, rightTimeColumn=`timestamps, checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_leftTable", offset=0, handler=appendForJoin{LjEngine, true}, msgAsTable=true)

$ n = 15
$ tem1 = table( (2018.10.08T01:01:01.001 + 1..12) join (2018.10.08T01:01:01.001 + 1..3)as timestamps,take(`A`B`C, n) as sym,take(1..15,n) as val,1..15 as id)
$ prices.append!(tem1)
$ sleep(2000)
$ n  = 10
$ tem2 = table( 2019.10.08T01:01:01.001 + 1..n as timestamps,take(`A`B`C, n) as sym,take(0.1+10..20,n) as price)
$ trades.append!(tem2)
$ sleep(100)
$ select * from output

sym

factor1

factor2

factor3

A

10.1

10

101

B

11.1

11

122.1

C

12.1

12

145.2

A

13.1

10

131

B

14.1

11

155.1

C

15.1

12

181.2

A

16.1

10

161

B

17.1

11

188.1

C

18.1

12

217.2

A

19.1

10

191

Ex 4: Join the left table “trades” (a real-time stream table) and the right table “prices” (a dimension table with relatively infrequent updates) to look up the matched records in column “id” from the right table.

$ share streamTable(1000:0, `time`volume`id, [TIMESTAMP, INT,INT]) as trades
$ dbPath="dfs://testlj"
$ if(existsDatabase(dbPath)){
$    dropDatabase(dbPath)
$ }
$ rt=table(1000:0, `time`price`id, [TIMESTAMP, DOUBLE, INT])
$ db=database(dbPath, VALUE, `A`B`C)
$ prices=db.createTable(rt,`rightTable)
$ outputTable = table(10000:0, `id`volume`price`prod, [INT,INT,DOUBLE,DOUBLE])

$ tradesLookupJoin = createLookupJoinEngine(name="streamLookup1", leftTable=trades, rightTable=prices, outputTable=outputTable, metrics=<[volume,price,volume*price]>, matchingColumn=`id, rightTimeColumn=`time,checkTimes=1s)
$ subscribeTable(tableName="trades", actionName="append_trades", offset=0, handler=appendForJoin{tradesLookupJoin, true}, msgAsTable=true)

$ def writeData(t,n){
$     timev = 2021.10.08T01:01:01.001 + timestamp(1..n)
$     volumev = take(1..n, n)
$     id = take(1 2 3, n)
$     insert into t values(timev, volumev, id)
$ }
$ //n=7
$ writeData(rt, 10)
$ prices.append!(rt)
$ sleep(2000)
$ writeData(trades, 6)
$ sleep(2)

$ select * from outputTable

id

volume

price

prod

1

1

10

10

2

2

8

16

3

3

9

27

1

4

10

40

2

5

8

40

3

6

9

54