createCrossSectionalEngine

Syntax

createCrossSectionalEngine(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern=’perBatch’], [triggeringInterval=1000], [useSystemTime=true], [timeColumn], [lastBatchOnly=false], [contextByColumn], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup=-1])

Alias: createCrossSectionalAggregator

Details

This function creates a cross-sectional streaming engine and returns a keyed table with keyColumn as the key.

The keyed table is updated every time a new record arrives. If the parameter lastBatchOnly is set to true, the table only maintains the latest record in each group. When new data is ingested into the engine,

  • if metrics and outputTable are specified, the engine first updates the keyed table, then performs calculations on the latest data and outputs the results to outputTable.

  • if metrics and outputTable are not specified, the engine only updates the keyed table.

For more application scenarios, see Streaming Engines.

Calculation Rules

The calculation can be triggered by the number of records or time interval. See parameters triggeringPattern and triggeringInterval. Note that if contextByColumn is specified, the data will be grouped by the specified columns and calculated by group.

Features

  • Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshotDir and snapshotIntervalInMsgCount)

  • High availability of streaming engines: To enable high availability of streaming engines, specify the parameter raftGroup on the leader node of the raft group on the subscriber. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table.

Arguments

name is a string of the engine name. It is the only identifier of a cross sectional engine on a data/compute node. It can have letter, number and “_” and must start with a letter.

metrics (optional) is metacode or a tuple specifying the formulas for calculation. It can be:

  • Built-in or user-defined aggregate functions, e.g., <[sum(qty), avg(price)]>; Or expressions on previous results, e.g., <[avg(price1)-avg(price2)]>; Or calculation on multiple columns, e.g., <[std(price1-price2)]>

  • Functions with multiple returns, such as <func(price) as `col1`col2>. The column names can be specified or not. For more information about metacode, see Metaprogramming.

  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

dummyTable is a table object whose schema must be the same as the stream table to which the engine subscribes. Whether dummyTable contains data does not matter.

outputTable (optional) is the output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function. Make sure that data types of columns storing calculation results are the same as the results of metrics. The output columns are in the following order:

  • The first column is of TIMESTAMP type.

    • If useSystemTime is specified, the column stores the time when each calculation starts;

    • Otherwise it takes the values of timeColumn.

  • If contextByColumn is specified, the second column is the contextByColumn.

  • The remaining columns store the calculation results of metrics.

keyColumn is a STRING scalar or vector that specifies one or more columns in the stream table as the key columns. For each unique value in the keyColumn, only the latest record is used in the calculation.

triggeringPattern (optional) is a STRING scalar specifying how to trigger the calculations. The engine returns a result every time a calculation is triggered. It can take one of the following values:

  • ‘perBatch’ (default): calculates when a batch of data arrives.

  • ‘perRow’: calculates when a new record arrives.

  • ‘interval’: calculates at the intervals of triggeringInterval (using system time).

  • ‘keyCount’: When data with the same timestamp arrives in batches, the calculation is triggered:

    • if the number of keys with the latest timestamp reaches triggeringInterval;

    • or data with newer timestamp arrives.

Note: To set triggeringPattern as ‘keyCount’, timeColumn must be specified and useSystemTime must be set to false. In this case, the out-of-order data will be discarded.

triggeringInterval (optional) can be an integer or a tuple. Below explains its optional values and triggering rules:

  • If triggeringPattern = ‘interval’, triggeringInterval is an integer indicating the interval in milliseconds between 2 adjacent calculations. The default value is 1,000.

  • If triggeringPattern = ‘keyCount’, triggeringInterval can be:

    • an integer specifying a threshold. Before data with a greater timestamp arrives, a calculation is triggered when the number of uncalculated records reaches the threshold.

    • a tuple of 2 elements. The first element is an integer indicating the threshold of the number records with the latest timestamp to trigger calculation. The second element is an integer or duration value.

For example, when triggeringInterval is set to (c1, c2):

  • If c2 is an integer and the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1). Data with t1 will be calculated when either of the events happens: the number of keys with timestamp t2 reaches c2, or data with greater timestamp t3 (t3>t2) arrives. Note that c2 must be smaller than c1.

  • If c2 is a duration and the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1) . Once data with t2 starts to come in, data with t1 will not be calculated until any of the events happens: the number of keys with timestamp t1 reaches c1, or data with greater timestamp t3 (t3>t2) arrives, or the duration c2 comes to an end.

useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time when data is ingested into the engine.

  • If useSystemTime = true, the time column of outputTable is the system time;

  • If useSystemTime = false, the parameter timeColumn must be specified. The time column of outputTable uses the timestamp of each record.

timeColumn (optional) is a STRING scalar which specifies the time column in the stream table to which the engine subscribes if useSystemTime = false. It can only be of TIMESTAMP type.

lastBatchOnly (optional) is a BOOLEAN value indicating whether to keep only the records with the latest timestamp in the engine. When lastBatchOnly = true, triggeringPattern must take the value ‘keyCount’, and the cross-sectional engine only maintains key values with the latest timestamp for calculation. Otherwise, the engine updates and retains all values for calculation.

contextByColumn (optional) is a STRING scalar indicating the grouping column based on which calculations are performed by group.

  • This parameter only takes effect if metrics and outputTable are specified.

  • If metrics only contains aggregate functions, the calculation results would be the same as a SQL query using group by. Otherwise, the results would be consistent with that using context by.

To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.

snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved.

  • The directory must already exist, otherwise an exception is thrown.

  • If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state.

  • Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

  • The file extension of a snapshot can be:

    • <engineName>.tmp: temporary snapshot

    • <engineName>.snapshot: a snapshot that is generated and flushed to disk

    • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.

raftGroup (optional) is an integer greater than 1, indicating ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability on the streaming engine. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

Examples

In this example, a table “csEngine1” is created with function createCrossSectionalEngine and it subscribes to the stream table trades1. We set triggeringPattern to ‘perRow’, so each row that is inserted into table csEngine1 triggers a calculation.

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades1
$ outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
$ csEngine1=createCrossSectionalEngine(name="csEngineDemo1", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades1, outputTable=outputTable, keyColumn=`sym, triggeringPattern="perRow", useSystemTime=false, timeColumn=`time)
$ subscribeTable(tableName="trades1", actionName="tradesStats", offset=-1, handler=append!{csEngine1}, msgAsTable=true)
$ insert into trades1 values(2020.08.12T09:30:00.000 + 123 234 456 678 890 901, `A`B`A`B`B`A, 10 20 10.1 20.1 20.2 10.2, 20 10 20 30 40 20);

$ select * from trades1;

time

sym

price

volume

2020.08.12T09:30:00.123

A

10

20

2020.08.12T09:30:00.234

B

20

10

2020.08.12T09:30:00.456

A

10.1

20

2020.08.12T09:30:00.678

B

20.1

30

2020.08.12T09:30:00.890

B

20.2

40

2020.08.12T09:30:00.901

A

10.2

20

$ select * from outputTable;

time

avgPrice

volume

dollarVolume

count

2020.08.12T09:30:00.123

10

20

200

1

2020.08.12T09:30:00.234

15

30

400

2

2020.08.12T09:30:00.456

15.05

30

402

2

2020.08.12T09:30:00.678

15.1

50

805

2

2020.08.12T09:30:00.890

15.15

60

1010

2

2020.08.12T09:30:00.901

15.2

60

1012

2

In the following example, triggeringPattern is set to ‘perBatch’. Insert 2 batches of data and the result has 2 rows.

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades2
$ outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
$ csEngine2=createCrossSectionalEngine(name="csEngineDemo2", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades2, outputTable=outputTable, keyColumn=`sym, triggeringPattern="perBatch", useSystemTime=false, timeColumn=`time)
$ subscribeTable(tableName="trades2", actionName="tradesStats", offset=-1, handler=append!{csEngine2}, msgAsTable=true)
$ insert into trades2 values(2020.08.12T09:30:00.000 + 123 234 456, `A`B`A, 10 20 10.1, 20 10 20);
$ sleep(1)
$ insert into trades2 values(2020.08.12T09:30:00.000 + 678 890 901, `B`B`A, 20.1 20.2 10.2, 30 40 20);

$ select * from outputTable;

time

avgPrice

volume

dollarVolume

count

2020.08.12T09:30:00.456

15.05

30

402

2

2020.08.12T09:30:00.901

15.2

60

1012

2

The following example sets triggeringPattern to ‘keyCount’ and lastBatchOnly to true. Only the data with the latest timestamp will participate in calculation. Since there are both aggregate and non-aggregate functions set in metrics, the number of rows in the result table will be the same as that in the input table.

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades1
$ outputTable = table(1:0, `time`factor1`factor2, [TIMESTAMP, DOUBLE,INT])
$ agg=createCrossSectionalAggregator(name="csEngineDemo4", metrics=<[price+ 0.1, sum(volume)]>, dummyTable=trades1, outputTable=outputTable, keyColumn=`sym, triggeringPattern="keyCount", triggeringInterval=5, useSystemTime=false, timeColumn=`time,lastBatchOnly=true)
$ subscribeTable(tableName=`trades1, actionName="csEngineDemo4", msgAsTable=true, handler=append!{agg})
$ num=10
$ time=array(TIMESTAMP)
$ time=take(2018.01.01T09:30:00.000,num)
$ sym=take("A"+string(1..10),num)
$ price=1..num
$ volume=1..num
$ tmp=table(time, sym, price, volume)
$ trades1.append!(tmp)

// Only the latest 5 records will participate in calculation.
$ num=5
$ time = array(TIMESTAMP)
$ time=take(2018.01.01T09:30:01.000,num)
$ sym=take("A"+string(1..10),num)
$ price=6..10
$ volume=6..10
$ tmp=table(time, sym, price, volume)
$ trades1.append!(tmp)

time

factor1

factor2

2018.01.01T09:30:00.000

1.1

55

2018.01.01T09:30:00.000

2.1

55

2018.01.01T09:30:00.000

3.1

55

2018.01.01T09:30:00.000

4.1

55

2018.01.01T09:30:00.000

5.1

55

2018.01.01T09:30:00.000

6.1

55

2018.01.01T09:30:00.000

7.1

55

2018.01.01T09:30:00.000

8.1

55

2018.01.01T09:30:00.000

9.1

55

2018.01.01T09:30:00.000

10.1

55

2018.01.01T09:30:01.000

6.1

40

2018.01.01T09:30:01.000

7.1

40

2018.01.01T09:30:01.000

8.1

40

2018.01.01T09:30:01.000

9.1

40

2018.01.01T09:30:01.000

10.1

40

The following example sets triggeringPattern to ‘interval’ and triggeringInterval to 500 (milliseconds).

$ share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades3
$ outputTable = table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT])
$ csEngine3=createCrossSectionalEngine(name="csEngineDemo3", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades3, outputTable=outputTable, keyColumn=`sym, triggeringPattern="interval", triggeringInterval=500)
$ subscribeTable(tableName="trades3", actionName="tradesStats", offset=-1, handler=append!{csEngine3}, msgAsTable=true);

$ insert into trades3 values(2020.08.12T09:30:00.000, `A, 10, 20)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 500, `B, 20, 10)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 1000, `A, 10.1, 20)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 2000, `B, 20.1, 30)
$ sleep(500)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 2500, `B, 20.2, 40)
$ insert into trades3 values(2020.08.12T09:30:00.000 + 3000, `A, 10.2, 20);
$ sleep(500)

$ select * from outputTable;

time

avgPrice

volume

dollarVolume

count

2022.03.02T11:17:02.341

15.1

50

805

2

2022.03.02T11:17:02.850

15.2

60

1,012

2

The calculation is triggered every 500 ms based on the system time. Only the records with the latest timestamp participate in the calculation, even if there are multiple uncalculated records with the same key.

In the above example, the table returned by cross sectional engine is usually an intermediate result for the calculation. But it can also be a final result. For example, if you need to regularly refresh the latest trading price of a certain stock, the basic way is to filter the stocks by code from the real-time trading table and retrieve the last record. However, the amount of data in the trading table is growing rapidly over time. For frequent queries, it is not the best practice in terms of system resource consumption or query performance. The cross sectional table only saves the latest transaction data of all stocks, the data amount is stable. So it is very suitable for the timing polling scenario.

To use a cross sectional table as a final result, you need to set metrics and outputTable to be empty.

$ tradesCrossEngine=createCrossSectionalEngine(name="CrossSectionalDemo", dummyTable=trades, keyColumn=`sym, triggeringPattern=`perRow)