createCrossSectionalEngine

Syntax

createCrossSectionalEngine(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern=’perBatch’], [triggeringInterval=1000], [useSystemTime=true], [timeColumn], [lastBatchOnly=false], [contextByColumn], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup=-1])

Alias: createCrossSectionalAggregator

Arguments

name is a string indicating the name of the engine. It is the only identifier of an engine. It can have letter, number and “_”. It must start with a letter.

metrics is a piece of metacode or a tuple specifying the formula for calculation. It can use built-in or user-defined aggregate functions or expressions such as <[sum(qty), avg(price)]>, <[avg(price1)-avg(price2)]> or <[std(price1-price2)]>. You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it’s not necessary to specify the column names). For more information about metacode please refer to Metaprogramming.

dummyTable is a TABLE indicating the schema of the subscribed stream table. It doesn’t matter whether dummyTable contains data or not.

outputTable is a TABLE containing calculation results. It can be an in-memory table or a distributed table.

  • If contextByColumn is not specified, the number of columns in the output table should be the number of metrics + 1. The first column is of TIMESTAMP type indicating the starting time of each calculation in system time (If timeColumn is specified, the values in timeColumn will be used instead). The remaining columns are calculation results - make sure to keep the data types of columns consistent with the results.

  • If contextByColumn is specified, the number of columns in the output table should be the number of metrics + 2. The first column is of TIMESTAMP type indicating the starting time of each calculation in system time (If timeColumn is specified, the values in timeColumn will be used instead). The second column is the column specified by contextByColumn. The remaining columns are calculation results - make sure to keep the data types of columns consistent with the results.

keyColumn is a string scalar or vector that specifies one or more columns in the stream table as the key columns of the cross-sectional streaming engine. For each unique value in the keyColumn, only the lastest row is used in the calculation.

triggeringPattern is a STRING indicating how calculation is triggered. Each calculation adds 1 record to the output table. This parameter can take the following values:

  • ‘perBatch’: Default value. Calculation is triggered by each batch of data that is inserted.

  • ‘perRow’: Calculation is triggered by each row of data that is inserted.

  • ‘interval’: Calculation is triggered at predetermined intervals (using system time).

  • ‘keyCount’: If data with the same timestamp arrive in batches, calculation is triggered if either a) the number of keys with the latest timestamp reaches triggeringInterval, or b) data that is to be calculated contains 2 different timestamps (i.e., data with newer timestamp arrives before some data with old timestamp is used for calculation). To set triggeringPattern to ‘keyCount’, timeColumn must be specified and useSystemTime must be set to false.

Note: If triggeringPattern is specified as keyCount, out-of-order data will not participate in the calculation.

triggeringInterval is an INTEGER or a tuple. Below explains its available values and triggering rules:

If *triggeringPattern*=’interval’: *triggeringInterval indicates the duration in milliseconds between 2 adjacent calculations. The default value is 1000. If *triggeringPattern*=’keyCount’: *triggeringInterval indicates the threshold of the number of keys in the unused data to trigger a calculation. It can also be a tuple of 2 elements, with the first being an integer indicating the threshold of the number of keys in the unused data to trigger a calculation, and the second being an INTEGER or DURATION value. In this example, the value of triggeringPattern is a tuple.

Suppose triggeringPattern =(c1, c2):

  • When c2 is an integer, if the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system will go on to save data with newer timestamp t2 (t2>t1). Data with t1 will be calculated when either of the events happens: the number of keys with timestamp t2 reaches c2, or data with newer timestamp t3 (t3>t2) starts to come in. Note that c2 must be smaller than c1.

  • When c2 is a duration, if the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system will go on to save data with newer timestamp t2 (t2>t1) . Once data with t2 starts to come in, data with t1 will not be calculated until any of the events happens: the number of keys with timestamp t1 reaches c1, or data with newer timestamp t3 (t3>t2) starts to come in, or the duration c2 comes to an end.

useSystemTime is a Boolean value indicating whether the first column of outputTable is system time (useSystemTime=true) or timeColumn in the stream table (useSystemTime=false).

timeColumn is a string. If useSystemTime=false, timeColumn is the name of the temporal column in the stream table. The column only supports TIMESTAMP type.

lastBatchOnly is an optional BOOLEAN indicating whether to keep the latest data only in the cross-sectional engine. When lastBatchOnly = true, triggeringPattern must take the value ‘keyCount’ and the the cross-sectional engine only saves the unique value with the latest timestamp for calculation. When lastBatchOnly = false, the engine updates and retains all unique values for calculation.

contextByColumn is an optional STRING scalar indicating the grouping column, based on which data will be calculated in groups.

  • To specify this parameter, metrics and outputTable must be specified accordingly.

  • If metrics only contains aggregate functions, the calculation results would be the same as using function groupby to conduct calculation in groups. Otherwise, the results would be the same as using function contextby.

To enable snapshot, snapshotDir and snapshotIntervalInMsgCount must be specified.

With snapshot enabled, if an exception occurs, the stream engine can be recovered to the latest snapshot.

snapshotDir is an optional STRING indicating the directory where the snapshot of the engine is stored.

  • Make sure the specified directory actually exists. Otherwise the system reports an error.

  • If snapshotDir is specified when creating the engine, the system will check if a snapshot exists in this directory. If it exists, the system will load the snapshot and restore the state of the engine (intermediate calculation results).

  • Multiple engines may share a single directory, using the name of the engine to distinguish the snapshot files.

  • A snapshot file of an engine may take 3 file extensions over different stages:

    • <engineName>.*tmp* for temporary stroage of snapshot data

    • <engineName>.*snapshot* for a generated snapshot file synced to disk

    • If a new snapshot file is created, the existing snapshot with the same name will be renamed to <engineName>.*old*

snapshotIntervalInMsgCount is an optional INTEGER indicating how many messages are processed before a new snapshot is generated.

raftGroup is the ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. It is an integer greater than 1. Specify raftGroup to enable high availability on the streaming engine. When a streaming engine is created on the leader, it is also created on each follower. Once a snapshot is generated, it will also be synced to all followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

Details

Create a cross-sectional engine. Return a keyed table with keyColumn values as the keys.

If metrics or outputTable is not specified, the engine only updates table records but will not calculate or output any data.

If metrics and outputTable are specified, the engine will update table records, conduct calculation using the latest records, and output results to the outputTable.

Examples

Use function createCrossSectionalEngine to create a table csEngine1 that subscribes to the stream table trades1.

In this example, triggeringPattern=”perRow”. Therefore each row that is inserted into table csEngine1 triggers a computation.

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades1
$ outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
$ csEngine1=createCrossSectionalEngine(name="csEngineDemo1", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades1, outputTable=outputTable, keyColumn=`sym, triggeringPattern="perRow", useSystemTime=false, timeColumn=`time)
$ subscribeTable(tableName="trades1", actionName="tradesStats", offset=-1, handler=append!{csEngine1}, msgAsTable=true)
$ insert into trades1 values(2020.08.12T09:30:00.000 + 123 234 456 678 890 901, `A`B`A`B`B`A, 10 20 10.1 20.1 20.2 10.2, 20 10 20 30 40 20);

$ select * from trades1;

time

sym

price

volume

2020.08.12T09:30:00.123

A

10

20

2020.08.12T09:30:00.234

B

20

10

2020.08.12T09:30:00.456

A

10.1

20

2020.08.12T09:30:00.678

B

20.1

30

2020.08.12T09:30:00.890

B

20.2

40

2020.08.12T09:30:00.901

A

10.2

20

$ select * from outputTable;

time

avgPrice

volume

dollarVolume

count

2020.08.12T09:30:00.123

10

20

200

1

2020.08.12T09:30:00.234

15

30

400

2

2020.08.12T09:30:00.456

15.05

30

402

2

2020.08.12T09:30:00.678

15.1

50

805

2

2020.08.12T09:30:00.890

15.15

60

1010

2

2020.08.12T09:30:00.901

15.2

60

1012

2

In this example, triggeringPattern=”perBatch”. Insert 2 batches of data and the result has 2 rows.

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades2
$ outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
$ csEngine2=createCrossSectionalEngine(name="csEngineDemo2", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades2, outputTable=outputTable, keyColumn=`sym, triggeringPattern="perBatch", useSystemTime=false, timeColumn=`time)
$ subscribeTable(tableName="trades2", actionName="tradesStats", offset=-1, handler=append!{csEngine2}, msgAsTable=true)
$ insert into trades2 values(2020.08.12T09:30:00.000 + 123 234 456, `A`B`A, 10 20 10.1, 20 10 20);
$ sleep(1)
$ insert into trades2 values(2020.08.12T09:30:00.000 + 678 890 901, `B`B`A, 20.1 20.2 10.2, 30 40 20);

$ select * from outputTable;

time

avgPrice

volume

dollarVolume

count

2020.08.12T09:30:00.456

15.05

30

402

2

2020.08.12T09:30:00.901

15.2

60

1012

2

In this example, triggeringPattern = “keyCount” and lastBatchOnly = true. Only the data with the latest timestamp will participate in calculation. Because there are both aggregator and non-aggregator functions set in metrics, the number of rows in the result table will be the same as that in the input table.

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades1
$ outputTable = table(1:0, `time`factor1`factor2, [TIMESTAMP, DOUBLE,INT])
$ agg=createCrossSectionalAggregator(name="csEngineDemo4", metrics=<[price+ 0.1, sum(volume)]>, dummyTable=trades1, outputTable=outputTable, keyColumn=`sym, triggeringPattern="keyCount", triggeringInterval=5, useSystemTime=false, timeColumn=`time,lastBatchOnly=true)
$ subscribeTable(tableName=`trades1, actionName="csEngineDemo4", msgAsTable=true, handler=append!{agg})
$ num=10
$ time = array(timestamp)
$ time=take(2018.01.01T09:30:00.000,num)
$ sym=take("A"+string(1..10),num)
$ price=1..num
$ volume=1..num
$ tmp=table(time, sym, price, volume)
$ trades1.append!(tmp)

// Only the latest 5 records will participate in calculation.
$ num=5
$ time = array(timestamp)
$ time=take(2018.01.01T09:30:01.000,num)
$ sym=take("A"+string(1..10),num)
$ price=6..10
$ volume=6..10
$ tmp=table(time, sym, price, volume)
$ trades1.append!(tmp)

time

factor1

factor2

2018.01.01T09:30:00.000

1.1

55

2018.01.01T09:30:00.000

2.1

55

2018.01.01T09:30:00.000

3.1

55

2018.01.01T09:30:00.000

4.1

55

2018.01.01T09:30:00.000

5.1

55

2018.01.01T09:30:00.000

6.1

55

2018.01.01T09:30:00.000

7.1

55

2018.01.01T09:30:00.000

8.1

55

2018.01.01T09:30:00.000

9.1

55

2018.01.01T09:30:00.000

10.1

55

2018.01.01T09:30:01.000

6.1

40

2018.01.01T09:30:01.000

7.1

40

2018.01.01T09:30:01.000

8.1

40

2018.01.01T09:30:01.000

9.1

40

2018.01.01T09:30:01.000

10.1

40

In this example, triggeringPattern=”interval” and triggeringInterval=500 (milliseconds).

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades3
$ outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
$ csEngine3=createCrossSectionalEngine(name="csEngineDemo3", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades3, outputTable=outputTable, keyColumn=`sym, triggeringPattern="interval", triggeringInterval=500)
$ subscribeTable(tableName="trades3", actionName="tradesStats", offset=-1, handler=append!{csEngine3}, msgAsTable=true);

$ insert into trades3 values(2020.08.12T09:30:00.000, `A, 10, 20)
$ sleep(500)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 500, `B, 20, 10)
$ sleep(500)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 1000, `A, 10.1, 20)
$ sleep(1000)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 2000, `B, 20.1, 30)
$ sleep(500)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 2500, `B, 20.2, 40)
$ sleep(500)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 3000, `A, 10.2, 20);

$ select * from outputTable;

time

avgPrice

volume

dollarVolume

count

2020.09.13T19:04:45.015

10

20

200

1

2020.09.13T19:04:45.529

15

30

400

2

2020.09.13T19:04:46.030

15.05

30

402

2

2020.09.13T19:04:46.531

15.05

30

402

2

2020.09.13T19:04:47.036

15.1

50

805

2

2020.09.13T19:04:47.551

15.15

60

1010

2

2020.09.13T19:04:48.065

15.2

60

1012

2

2020.09.13T19:04:48.567

15.2

60

1012

2

For triggeringPattern = “interval”, the calculation is triggered regularly by the system time instead of the arriving data. Thus the size of the output table will keep growing.

In the above example, the table returned by cross sectional engine is usually an intermediate result for the calculation. But it can also be a final result.

For example, if you need to regularly refresh the latest trading price of a certain stock, the basic way is to filter the stocks by code from the real-time trading table and retrieve the last record. However, the amount of data in the trading table is growing rapidly over time. For frequent query, it is not the best practice in terms of system resource consumption or query performance. The cross sectional table only saves the latest transaction data of all stocks, the data amount is stable. So it is very suitable for the timing polling scenario.

To use a cross sectional table as a final result, set metrics and outputTable to be empty.