createAnomalyDetectionEngine

Syntax

createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup])

Arguments

name is a string indicating the name of the aggregator. It is the only identifier of an aggregator. It can have letter, number and “_”. It must start with a letter.

metrics is metacode or tuple specifying the formula for anomaly detection. It uses functions or expressions such as <[sum(qty)>5, avg(qty)>qty, qty<4]> that indicate anomaly conditions and return Boolean values.

dummyTable is a table object whose schema must be the same as the subscribed stream table. The system uses the schema of dummyTable to determine the data type of each column in the messages. Whether dummyTable contains data does not affect the aggregation result.

outputTable is the output table of detected anomalies. The first column is of temporal data type and stores the time when an anomaly is detected. If keyColumn is specified, the second column is keyColumn. The next 2 columns are of INT and STRING/SYMBOL data type indicating the category (the position of the condition in metrics) and the content of the anomaly conditions.

timeColumn is a string scalar indicating the name of the temporal column of the input stream table.

keyColumn is a string scalar indicating the grouping column name. The anomaly detection engine conducts the calculations within each group specified by keyColumn. It is optional.

windowSize is a positive integer indicating the length of the data window for calculation. If aggregate function are used in metrics then windowSize is required. The unit of windowSize is the smallest unit of time of timeColumn.

step is a positive integer indicating the duration between 2 adjacent calculations. The unit of step is the smallest unit of time of timeColumn. The value of windowSize must be a multiple of step, otherwise an exception will be thrown. The system automatically adjusts the left bound of the first window. For details please check createTimeSeriesAggregator. createTimeSeriesEngine

garbageSize is a positive integer. It is optional and the default value is 2000 (in Bytes).

  • If keyColumn is not specified, when the number of historical records in memory exceeds garbageSize, the system will clear the records that is no longer needed.

  • If keyColumn is specified, record clearing is conducted independently in each group. When the number of historical records in a group exceeds garbageSize, the system will clear the records within the group that is no longer needed.

  • garbageSize does not work if no aggregate function is specified in parameter metrics.

snapshotDir is a string indicating the directory where the streaming engine snapshot is saved. The snapshot is used to restore the streaming engine after system interruption.

  • The directory must already exist, otherwise an exception is thrown.

  • When creating a streaming engine, if snapshotDir is specified, the system will check whether there is a snapshot in the directory already. If it exists, the snapshot will be loaded to restore the state of the engine.

  • Multiple streaming engines can share a directory.

  • A snapshot may use 3 names. When the snapshot is being generated, the file name is <engineName>.tmp; after the snapshot is generated and flushed to the disk, it is renamed to <engineName>.snapshot; if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount is a positive integer indicating the number of messages between 2 streaming engine snapshots.

To enable snapshots in streaming engines, snapshotDir and snapshotIntervalInMsgCount must be specified.

raftGroup is the ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. It is an integer greater than 1. Specify raftGroup to enable high availability on the streaming engine. When a streaming engine is created on the leader, it is also created on each follower. Once a snapshot is generated, it will also be synced to all followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

Details

Create an anomaly detection engine. Return an abstract table object. Data inserted into this table is used to calculate specified metrics to detect anomalies. Generally it is used in conjunction with subscribeTable.

We can use the following 3 types of expressions as the metrics:

  • Comparison between a column and a constant or between columns. Nonaggregate functions may be used but aggregate function are not used. For examples: qty < 4, qty > price, lt(qty, prev(qty)), isNull(qty) == false, etc. For these metrics, the engine conducts calculations for each row and determines whether to output the result.

  • Comparison between aggregate function result and a constant or between aggregate function results. Nonaggregate functions may be used, but their arguments may only include aggregate functions and/or constants, not columns. For examples: avg(qty - price) > 10, percentile(qty, 90) < 100, max(qty) < avg(qty) * 2, le(sum(qty), 5), etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter ‘step’ and determines whether to output the result.

  • Comparison be tween aggregate function result and a column, or nonaggregate functions are used and their arguments include aggregate functions and columns. For examples: avg(qty) > qty, le(med(qty), price), etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter ‘step’ and determines whether to output the result.

Examples

$ share streamTable(1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT]) as trades
$ outputTable = table(1000:0, `time`sym`type`metric, [TIMESTAMP, SYMBOL, INT, STRING])
$ engine = createAnomalyDetectionEngine(name="anomalyDetection1", metrics=<[sum(qty) > 5, avg(qty) > qty, qty < 4]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time, keyColumn=`sym, windowSize=3, step=3)
$ subscribeTable(tableName="trades", actionName="anomalyDetectionSub1", offset=0, handler=append!{engine}, msgAsTable=true)

$ def writeData(n){
$      timev = 2018.10.08T01:01:01.001 + 1..n
$      symv =take(`A`B, n)
$      qtyv = n..1
$      insert into trades values(timev, symv, qtyv)
$ }

$ writeData(6);

$ select * from trades;

time

sym

qty

2018.10.08T01:01:01.002

A

6

2018.10.08T01:01:01.003

B

5

2018.10.08T01:01:01.004

A

4

2018.10.08T01:01:01.005

B

3

2018.10.08T01:01:01.006

A

2

2018.10.08T01:01:01.007

B

1

select * from outputTable;

time

sym

type

metric

2018.10.08T01:01:01.003

A

0

sum(qty) > 5

2018.10.08T01:01:01.004

A

1

avg(qty) > qty

2018.10.08T01:01:01.005

B

2

qty < 4

2018.10.08T01:01:01.006

A

1

avg(qty) > qty

2018.10.08T01:01:01.006

A

2

qty < 4

2018.10.08T01:01:01.006

B

0

sum(qty) > 5

2018.10.08T01:01:01.007

B

1

avg(qty) > qty

2018.10.08T01:01:01.007

B

2

qty < 4

The calculation process of the anomaly detection engine is explained in details below:

(1) The indicator sum(qty)>5 represents the comparison between the aggregate result and a constant, the anomaly detection engine will check this indicator during the calculation in each window.

  • The first window ranges from 2018.10.08T01:01:01.000 to 2018.10.08T01:01:01.002, and the sum(qty) of A and B is calculated respectively. At 2018.10.08T01:01:01.003 the engine starts to judge whether it meets the condition sum(qty)>5.

  • The second window ranges from 2018.10.08T01:01:01.003 to 2018.10.08T01:01:01.005, at 2018.10.08T01:01:01.006, the engine starts to judge whether it meets the condition sum(qty)>5, and so on.

(2) The indicator avg(qty)>qty represents the comparison between the aggregate result and a certain column. Therefore, whenever data arrives, the anomaly detection engine compares the data with the aggregation result of the previous window. Util the next aggregation calculation is triggered, the engine check whether the result meets the conditions and output it.

  • The first window ranges from 2018.10.08T01:01:01.000 to 2018.10.08T01:01:01.002, the avg(qty) of A and B is calculated respectively. Each qty between 2018.10.08T01:01:01.003 and 2018.10.08T01:01:01.005 will be compared with the avg(qty) of the previous window.

  • The window moves at 2018.10.08T01:01:01.005.

  • The second window ranges from 2018.10.08T01:01:01.003 to 2018.10.08T01:01:01.005. In the second window, the avg(qty) of A and B is calculated, each qty between 2018.10.08T01:01:01.006 and 2018.10.08T01:01:01.008 will be compared with the avg(qty) of the previous window, and so on.

(3) The indicator qty<4 is a comparison between the qty column and a constant, so every time the data arrives, the anomaly detection engine will check this indicator.