replay

=====================================

Syntax

replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1], [sortColumns])

Details

Replay one or more tables or data sources (generated by replayDS) to table(s) in chronological order to simulate real-time ingestion of streaming data. It is commonly used for backtesting of high-frequency trading strategies.

  • Replay Types

Based on mappings between the input table(s) and output table(s) , there are three replay types: 1-to-1, N-to-1, and N-to-N. For N-to-1 replay, before version 1.30.17, the schemata of the input tables must be the same, which is called homogeneous replay. Starting from version 1.30.17, replay supports inputting tables with different schemata, which is called heterogeneous replay. For heterogeneous replay, the serialized result of replayed records cannot be processed directly. The output table needs to be deserialized and data is filtered and processed by streamFilter.

  • Replay Rate

    • If replayRate is a positive integer and absoluteRate = true, replay replayRate records per second.

    • If replayRate is a positive integer, and absoluteRate = false, replay at replayRate times the time span of the data. Note that the number of records replayed per second is the same.

    • If replayRate is negative or unspecified, replay at the maximum speed.

  • Replay Process

(1) Loading data

When parameter inputTables is specified as a tuple of data sources, it is first loaded from disk to memory.

Before version 1.30.21, only data souces with the same index are loaded and then replayed in order. Starting from version 1.30.21, it will be automatically sorted by timestamps before it is replayed.

Note:

  • To improve the performance, the parallelLevel parameter can be specified to load data in parallel.

  • Data is loaded and replayed asynchronously.

(2) Replaying data in batches

Data to be replayed is loaded from the memory. Only data in the same batch is replayed in order. Therefore, please sort the input tables by the specified time column (as determined by the parameters dateColumn and timeColumn) before calling the replay function.

Replay one or more tables containing n records with a time span of t seconds:

Replay Rate Batch Size Elapsed time (s) Note
replays specified records per second replayRate records from one or multiple table(s) sorted by the timestamps n/replayRate If the number of records loaded in one second is less than replayRate, all loaded data will be replayed as a batch.
replays at specified times the time span of the data replayRate *n/t records from one or multiple table(s) sorted by the timestamps t/ replayRate - If replayRate *n/t < 1, take 1. - If the number of records loaded in one second is less than replayRate*n/t, all loaded data will be replayed as a batch.
replays at the maximum speed all loaded data determined by the performance of the system For N-to-N replay, each table is replayed one by one in this case.

Note:

  • When inputTables is specified as data sources, (i.e., loaded from disk), the replay speed is impacted by disk I/O.

  • The elapsed time of heterogeneous replay will be slightly longer than homogeneous replay.

(3) Writing data: Currently the system only supports writing to the output tables in a single thread.

(4) Terminating the replay: Use command cancelJob or cancelConsoleJob from a new web or console.

Arguments

inputTables can be:

  • for 1-to-1 replay, a non-partitioned in-memory table or data source;

  • for N-to-N/N-to-1 homogeneous replay, multiple non-partitioned in-memory tables or a tuple of data sources;

  • for N-to-1 heterogeneous replay, a dictionary. The key of the dictionary is a user-defined string indicating the unique identifier of the input table, and the value is the table object or data source.

outputTables can be:

  • for 1-to-1/N-to-1 homogeneous replay, a table object (a non-partitioned in-memory table/stream table) or a string scalar with the same schema of the input table.

  • for N-to-N replay, a string vector or tuple of table objects (non-partitioned in-memory tables/stream tables) with the same length as that of inputTables. The outputTables and inputTables are mapped one-to-one, and each pair has the same schema.

  • for N-to-1 heterogeneous replay, a table object (a non-partitioned in-memory table/stream table) containing at least three columns:

    • The first column is of TIMESTAMP type indicating the timestamp specified by dateColumn/timeColumn;

    • The second column is of SYMBOL or STRING type indicating the key of the dictionary specified in the inputTables;

    • The third column must be of BLOB type that stores the serialized result of each replayed record.

    • In addition, you can output the columns with the same column names and data types in the input tables.

dateColumntimeColumn is the column name of the time column. At least one of them must be specified.

  • for 1-to-1/N-to-1 homogeneous replay: it is a string scalar, and the time columns in the inputTables and outputTables must use the same name.

  • for N-to-N replay: It is a string scalar if time columns of the input tables have same column names; otherwise, it is a string vector.

  • for N-to-1 replay: It is a string scalar if time columns of the input tables have same column names; otherwise, it is a dictionary. The key of the dictionary is a user-defined string indicating the unique identifier of the input table, and the value is dateColumn/timeColumn.

If dateColumn and timeColum are specified as the same column or only one of them is specified, there is no restriction on the type of the specified time column.

If dateColumn and timeColum are specified as different columns, dateColumn must be DATE and timeColum can only be SECOND, TIME or NANOTIME.

replayRate is an integer. Together with the parameter absoluteRate, it determines the speed of replaying data.

absoluteRate is a Boolean value. The default value is true, indicating that the system replays replayRate records per second. If set to false, data is replayed at replayRate times the time span of the data.

parallelLevel is a positive integer indicating the number of threads to load data sources to memory concurrently. The default value is 1. If inputTables is not a data source, there is no need to specify.

sortColumns is a STRING scalar or vector of length 2. Data with the same timestamp is sorted according to the specified sortColumns. It is supported only for heterogeneous replay.

Note that any column in either of the input tables can be specified as a sort column. If one of the input tables doesn’t contain the specified sort column, it is filled with NULL values and treated as the minimum values when the data is sorted.

Examples

Ex 1. 1-to-1 replay:

$ n=1000
$ sym = take(`IBM,n)
$ timestamp= take(temporalAdd(2012.12.06T09:30:12.000,1..500,'s'),n)
$ volume = rand(100,n)
$ trades=table(sym,timestamp,volume)
$ trades.sortBy!(`timestamp)
$ share streamTable(100:0,`sym`timestamp`volume,[SYMBOL,TIMESTAMP,INT]) as st
  • Replay 100 records per second. For 1000 records in table “trades”, it takes about 10 seconds.

$ timer replay(inputTables=trades, outputTables=st, dateColumn=`timestamp, timeColumn=`timestamp,replayRate=100, absoluteRate=true);
Time elapsed: 10001.195 ms
  • Replay at 100 times the time span of the data. The difference between the start timestamp and the end timestamp in table “trades” is 500 seconds, and it takes about 5 seconds to replay the table.

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp,replayRate=100,absoluteRate=false);
Time elapsed: 5001.909 ms
  • Replay at the maximum speed:

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp);
Time elapsed: 2.026 ms

Ex 2. N-to-N replay.

The following script replays two data sources to the join engine for asof join.

$ n=50000
$ sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
$ date=take(2012.06.12..2012.06.16,n)
$ time=rand(13:00:00.000..16:59:59.999,n)
$ volume = rand(100,n)
$ t1=table(sym,date,time,volume).sortBy!([`date, `time])

$ sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
$ date=take(2012.06.12..2012.06.16,n)
$ time=rand(13:00:00.000..16:59:59.999,n)
$ price = 100 + rand(10.0,n)
$ t2=table(sym,date,time,price).sortBy!([`date, `time])

$ if(existsDatabase("dfs://test_stock")){
$ dropDatabase("dfs://test_stock")
$ }
$ db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16)
$ pt1=db.createPartitionedTable(t1,`pt1,`date).append!(t1)
$ pt2=db.createPartitionedTable(t2,`pt2,`date).append!(t2)

$ left = table(100:0,`sym`dt`volume,[SYMBOL,TIMESTAMP,INT])
$ right = table(100:0,`sym`dt`price,[SYMBOL,TIMESTAMP,DOUBLE])

$ opt=table(100:0, `dt`sym`volume`price`total, [TIMESTAMP, SYMBOL, INT, DOUBLE, DOUBLE])
$ ajEngine=createAsofJoinEngine(name="ajEngine", leftTable=left, rightTable=right, outputTable=opt, metrics=<[volume, price, volume*price]>, matchingColumn=`sym, timeColumn=`dt, useSystemTime=false, delayedTime=1)

$ ds1=replayDS(sqlObj=<select sym, concatDateTime(date, time) as dt, volume from pt1>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:00:00.000, 14:00:00.000, 15:00:00.000, 16:00:00.000, 17:00:00.000])
$ ds2=replayDS(sqlObj=<select sym, concatDateTime(date, time) as dt, price from pt2>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:00:00.000, 14:00:00.000, 15:00:00.000, 16:00:00.000, 17:00:00.000])

$ replay(inputTables=[ds1,ds2], outputTables=[getLeftStream(ajEngine), getRightStream(ajEngine)], dateColumn=`dt);

$ select count(*) from opt
50000

Ex 3. N-to-1 heterogeneous replay. The output table needs to be deserialized, filtered and processed by streamFilter.

$ n=1000
$ sym = take(`IBM`GS,n)
$ myDate=take(2021.01.02..2021.01.06, n).sort!()
$ myTime=take(09:30:00..15:59:59,n)
$ vol = rand(100,n)
$ t=table(sym,myDate,myTime,vol)

$ sym = take(`IBM`GS,n)
$ date=take(2021.01.02..2021.01.06, n).sort!()
$ time=take(09:30:00..15:59:59,n)
$ vol = rand(100,n)
$ price = take(10,n)+rand(1.0,n)
$ t1=table(sym, date,time,vol,price)


$ if(existsDatabase("dfs://test_stock1")){
$ dropDatabase("dfs://test_stock1")
$ }
$ db1=database("",RANGE, 2021.01.02..2021.01.07)
$ db2=database("",VALUE,`IBM`GS)
$ db=database("dfs://test_stock1",COMPO,[db1, db2])
$ orders=db.createPartitionedTable(t,`orders,`myDate`sym)
$ orders.append!(t);
$ trades=db.createPartitionedTable(t1,`trades,`date`sym)
$ trades.append!(t1);
$ // load data sources
$ ds = replayDS(sqlObj=<select * from loadTable(db, `orders)>, dateColumn=`myDate, timeColumn=`myTime)
$ ds.size();
$ ds1 = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`date, timeColumn=`time)
$ ds1.size();

$ input_dict  = dict(["msg1", "msg2"], [ds, ds1])
$ date_dict = dict(["msg1", "msg2"], [`myDate, `date])
$ time_dict = dict(["msg1", "msg2"], [`myTime, `time])
$ //subscribe to the output table of replay to ingest the data to the stream filter
$ share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT]) as opt

$ filterOrder=table(100:0, `sym`date`time`volume, [SYMBOL, DATE, SECOND, INT])
$ filterTrades=table(100:0, `sym`date`time`volume`price, [SYMBOL, DATE, SECOND, INT, DOUBLE])
$ //define the input table of stream filter
$ share streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT]) as streamFilter_input
$ // the stream filter splits the ingested data and distributes them to table “filterOrder“ and “filterTrades”
$ filter1=dict(STRING,ANY)
$ filter1['condition']=`msg1
$ filter1['handler']=filterOrder

$ filter2=dict(STRING,ANY)
$ filter2['condition']=`msg2
$ filter2['handler']=filterTrades
$ schema=dict(["msg1","msg2"], [filterOrder, filterTrades])
$ stEngine=streamFilter(name=`streamFilter, dummyTable=streamFilter_input, filter=[filter1,filter2], msgSchema=schema)
$ subscribeTable(tableName="opt", actionName="sub1", offset=0, handler=stEngine, msgAsTable=true)

$ replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=100, absoluteRate=false);

$ select count(*) from filterOrder
1000