streamFilter

Details

Create an engine that splits the ingested stream for different handlers. Return a table object.

The engine works as follows:

1. Deserialize the ingested data.

Note: This step only takes place when the ingested data is from a heterogeneous stream table, i.e., the output of heterogeneous replay.

2. Split the ingested stream based on the conditions as specified by filter.

3. Ingest the split streams to the handlers as specified by filter in the order of their timestamps.

Note:

Starting from version 1.30.18/2.00.6, in addition to heterogeneous stream tables, streamFilter also supports processing data from standard stream tables.

Syntax

streamFilter(name, dummyTable, filter, [msgSchema], [timeColumn], [conditionColumn])

Arguments

name is a STRING scalar indicating the name of the stream filter engine. It must begin with a letter and may contain letters, numbers and underscores.

dummyTable is a table. It has the same schema as the stream table that the stream filter subscribes to. The table can be empty.

filter is a dictionary or a tuple of dictionaries. It defines how to process the ingested data. Each dictionary can have the following key-value pairs:

  • ‘timeRange’ [optional] is a pair or a tuple of pairs. Apply it to timeColumn to filter for records in the specified time range. Note: When processing a standard stream table, timeRange must have the same data type as timeColumn.

  • ‘condition’:

    • When processing a heterogeneous stream table: It is a STRING referring to a dictionary key from the inputTables of replay. The engine will filter records by the specified key.

    • When processing a standard stream table: It is a STRING scalar/vector indicating the value(s) from the conditionColumn, or metacode of one or more Boolean expressions (can contain built-in functions; cannot contain partial applications). The engine will filter records by the specified condition.

  • ‘handler’ is a unary function or a table (can be the table object returned by a streaming engine).

    • If it’s a function, the filter result (a table) is passed as the function’s sole argument.

    • If it’s a table object, the filtered data are inserted into the table directly.

msgSchema [optional] A dictionary

  • When processing a heterogeneous stream table: The dictionary indicates the input tables of replay. The keys are the table identifiers as specified in the inputTables parameter of replay and the values indicate the schema of each table. The ingested data will be parsed based on msgSchema.

  • When processing a standard stream table: Do not specify the parameter.

The following parameters are only required when processing data from a standard stream table:

timeColumn is a STRING indicating the name of the temporal column in dummyTable. If unspecified, it takes the name of the first column in the dummyTable.

conditionColumn is a STRING indicating a column (must be STRING or SYMBOL type) in dummyTable. If this parameter is unspecified, the “condition” key of filter takes no effect.

Examples

(1) Processing a heterogeneous stream table:

Replay the DFS tables “orders” and “trades” to simulate an asof join of the two streams.

If we simply perform an N-to-N replay on the two tables, it is not guaranteed that the records will be ingested to the left and right tables of the asof join engine in chronological order: It may happen that a record with a larger timestamp arrives in the left table before a record with a smaller timestamp arrives in the right table. For more information, see replay.

Therefore, we replay the two tables into one heterogeneous stream table to make sure all records are ordered by timestamp. The stream filter subscribes to the heterogenous stream table, splits the ingested data into two streams and distributes them to the left and right tables of the asof join engine (createAsofJoinEngine). In this way, we can make sure that the data is ingested to the left and right tables of the asof join engine in the order of the timestamps.

//create the "orders" table
$ n=1000
$ sym = take(take("IBM",n).join(take("GS",n)), n*2*3)
$ date=take(2022.01.04..2022.01.06, n*2*3).sort!()
$ timestamp1=take(2022.01.04 09:30:00.000+rand(1000,n),n) join take(2022.01.04 09:31:00.000+rand(1000,n),n)
$ timestamp2=take(2022.01.05 09:30:00.000+rand(1000,n),n) join take(2022.01.05 09:31:00.000+rand(1000,n),n)
$ timestamp3=take(2022.01.06 09:30:00.000+rand(1000,n),n) join take(2022.01.06 09:31:00.000+rand(1000,n),n)
$ timestamp=timestamp1 join timestamp2 join timestamp3
$ volume = rand(100, n*2*3)
$ t=table(sym,date,timestamp,volume)

$ if(existsDatabase("dfs://test_order")){
$ dropDatabase("dfs://test_order")
$ }
$ db1_or=database("",RANGE, 2022.01.04..2022.01.07)
$ db2_or=database("",VALUE,`IBM`GS)
$ db_or=database("dfs://test_order",COMPO,[db1_or, db2_or])
$ orders=db_or.createPartitionedTable(t,`orders,`date`sym)
$ orders.append!(t);
$ select count(*) from orders
6000

//create the "trades" table
$ n=2000
$ sym = take(take("IBM",n).join(take("GS",n)), n*2*3)
$ date=take(2022.01.04..2022.01.06, n*2*3).sort!()
$ timestamp1=take(2022.01.04 09:30:00.000+rand(1000,n),n) join take(2022.01.04 09:31:00.000+rand(1000,n),n)
$ timestamp2=take(2022.01.05 09:30:00.000+rand(1000,n),n) join take(2022.01.05 09:31:00.000+rand(1000,n),n)
$ timestamp3=take(2022.01.06 09:30:00.000+rand(1000,n),n) join take(2022.01.06 09:31:00.000+rand(1000,n),n)
$ timestamp=timestamp1 join timestamp2 join timestamp3
$ volume = rand(100, n*2*3)
$ price = rand(50.0, n*3) join  rand(20.0, n*3)

$ t=table(sym,date,timestamp,volume,price)

$ if(existsDatabase("dfs://test_trades")){
$ dropDatabase("dfs://test_trades")
$ }
$ db1=database("",RANGE, 2022.01.04..2022.01.07)
$ db2=database("",VALUE,`IBM`GS)
$ db=database("dfs://test_trades",COMPO,[db1, db2])
$ trades=db.createPartitionedTable(t,`trades,`date`sym)
$ trades.append!(t);
$ select count(*) from trades
12000

//generate the heterogeneous data sources and create a table as the the outputTable of replay()
$ ds_or = replayDS(sqlObj=<select * from loadTable(db_or, `orders)>, dateColumn=`date, timeColumn=`timestamp)
$ ds = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`timestamp)
$ input_dict=dict(["orders","trades"], [ds_or, ds])
$ share streamTable(100:0,`timestamp`sym`blob`volume, [TIMESTAMP,SYMBOL, BLOB, INT]) as opt


//subscribe to the output table of replay to ingest the data to the stream filter
$ share streamTable(100:0,`timestamp`sym`blob`volume, [TIMESTAMP,SYMBOL, BLOB, INT]) as streamFilterOpt
$ share streamTable(100:0, `sym`date`timestamp`volume, [SYMBOL, DATE, TIMESTAMP, INT] ) as streamOrders
$ share streamTable(100:0, `sym`date`timestamp`volume`price, [SYMBOL, DATE, TIMESTAMP, INT, DOUBLE] ) as streamTrades
$ streamOpt=table(100:0, `timestamp`sym`volume`price`result, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE])

$ filter1=dict(STRING,ANY)
$ filter1['condition']=`orders
$ filter1['timeRange']=09:30:00.000:09:30:00.005

$ filter2=dict(STRING,ANY)
$ filter2['condition']=`trades
$ filter2['timeRange']=09:30:00.000:09:30:00.005

$ ajEngine=createAsofJoinEngine(name="ajEngine", leftTable=streamOrders, rightTable=streamTrades, outputTable=streamOpt, metrics=<[volume,price,price*volume]>, matchingColumn=`sym, useSystemTime=true)
$ filter1['handler']=getLeftStream(ajEngine)
$ filter2['handler']=getRightStream(ajEngine)
$ schema=dict(["orders","trades"], [streamOrders, streamTrades])

$ engine=streamFilter(name=`streamFilter,dummyTable=streamFilterOpt, filter=[filter1,filter2],msgSchema=schema)
$ subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=engine, msgAsTable=true)

//replay the heterogeneous data sources and output to the table "opt"
$ replay(inputTables=input_dict,outputTables=opt, timeColumn=`timestamp)

$ select count(*) from streamOpt
20

//drop the subscription
unsubscribeTable(tableName="opt", actionName="sub1")
dropStreamEngine(`streamFilter)
dropStreamEngine(`ajEngine)

(2) Processing standard stream table:

In this example, data from the standard stream table “trades“ is ingested to the stream filter, where the records are filtered and assigned to handlers for further processing.

$ n=20
$ sym = symbol(take(`A`B`C,n))
$ name = string(rand(1..10,n))
$ date = temporalAdd(2012.12.06,0..(n-1),'d')
$ time = temporalAdd(09:30:00.000,0..(n-1),'ms')
$ vol = 100+take(1..8,20)
$ t = table(date,time,sym,name,vol)

$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st1
$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st2
$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as st3


$ share streamTable(100:0,`time`sym`sum_vol,[TIME,SYMBOL,INT]) as output1
$ share streamTable(100:0,`time`avg_vol,[TIME,INT]) as output2

// create 2 streaming engines has the handlers of the stream filter
$ engine1=createTimeSeriesEngine(name="timeEngine", windowSize=3, step=3, metrics=<[sum(vol)]>, dummyTable=st3, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50)
$ engine2=createReactiveStateEngine(name="reactiveEngine", metrics=<[mavg(vol, 3)]>, dummyTable=st1, outputTable=output2, keyColumn=`sym)

//share "trades" as the stream table to be subscribed by the stream filter
$ share streamTable(100:0,`date`time`sym`name`vol,[DATE,TIME,SYMBOL,STRING,INT]) as trades

//set the first filter and ingest the result to engine2
$ filter1 = dict(STRING,ANY)
$ filter1['condition']=`A
$ filter1['handler']=engine2
$ filter1['timeRange']=(09:30:00.001:09:30:00.010,09:29:00.000:09:30:00.000)

//set the second filter and ingest the result to st2
$ filter2 = dict(STRING,ANY)
$ filter2['handler']=st2
$ filter2['timeRange']=09:30:00.002:09:30:00.005

//set the first filter and ingest the result to engine1
$ filter3 = dict(STRING,ANY)
$ filter3['condition']=`C`A
$ filter3['handler']=engine1

///The stream filter subscribes to the stream table "trades" and distributes the ingested data based on the specified conditions
$ streamFilter2=streamFilter(name="streamFilterDemo",dummyTable=trades,filter=[filter1,filter2,filter3], timeColumn=`time, conditionColumn=`sym)
$ subscribeTable(tableName="trades", actionName="sub1", offset=0, handler=streamFilter2, msgAsTable=true)
$ trades.append!(t)
$ select * from output1

time

sym

sum_vol

09:30:00.003

A

101

09:30:00.003

C

103

09:30:00.006

A

104

09:30:00.006

C

106

09:30:00.009

A

107

09:30:00.009

C

101

09:30:00.012

A

102

09:30:00.012

C

104

09:30:00.015

A

105

09:30:00.015

C

107

09:30:00.018

A

108

$ select * from output2

time

avg_vol

00:00:00.001

00:00:00.001

00:00:00.001

104

00:00:00.001

104

$ select * from st2

date

time

sym

name

vol

2012.12.08

09:30:00.002

C

6

103

2012.12.09

09:30:00.003

A

8

104

2012.12.10

09:30:00.004

B

10

105

2012.12.11

09:30:00.005

C

10

106

2012.12.12

09:30:00.006

A

10

107

2012.12.13

09:30:00.007

B

1

108

2012.12.14

09:30:00.008

C

3

101

2012.12.15

09:30:00.009

A

4

102

2012.12.16

09:30:00.010

B

9

103

“condition” can also be specified as Boolean expressions to support more complex filter logic. In the example above, replace the value of “condition” in filter2 with a Boolean expression to filter the data based on the columns “vol” and “date“.

$ filter2 = dict(STRING,ANY)
$ filter2['condition'] = <sym==`A and 101<vol<105 and date<2012.12.15>
$ filter2['handler'] = st2
$ filter2['timeRange'] = 09:30:00.002:09:30:00.010

$ select * from st2

date

time

sym

name

vol

2012.12.09

09:30:00.003

A

7

104