replay

Syntax

replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])

Details

Replay one or more tables or data sources to a table in chronological order to simulate real-time ingestion of streaming data. To replay historical data, use function replayDS to generate a data source as an input of replay.

There are 3 mapping types between the inputTables and outputTables: one-to-one, N-to-one, and N-to-N. For N-to-one replay, before version 1.30.17 / 2.00.5, the schemata of the input tables must be the same, which is called homogeneous replay. Starting from version 1.30.17/2.00.5, replay supports inputting tables with different schemata in the format of a dictionary, which is called heterogeneous replay. In heterogeneous replay, the output table can be deserialized and data can be filtered and processed by streamFilter.

Note: In N-to-one replay, if inputTables are data sources, the size of each data source and the time range of dateColumn must be consistent.

To terminate the replay, use commands cancelJob or cancelConsoleJob.

Arguments

inputTables are non-partitioned in-memory tables or data sources generated by function replayDS. It is

  • a scalar or tuple for one or more table objects or data sources with the same schema.

  • a dictionary for multiple table objects or data sources with different schemata. The key of the dictionary is a user-defined string indicating the unique identifier of the input table, and the value is the table object or data source.

outputTables are in-memory tables or stream tables. If outputTables are shared tables, they can also be specified as sharedNames.

  • In one-to-one replay or homogeneous replay: the outputTables is a table object or string scalar with the same schema of the input table.

  • In N-to-N replay: the outputTables is a string vector or tuple of table objects with the same length as that of inputTables. The outputTables and inputTables are mapped one-to-one, and each pair has the same schema.

  • In heterogeneous replay: the outputTables is a table containing at least three columns:

    • The first column is of TIMESTAMP type indicating the timestamp specified by dateColumn/timeColumn;

    • The second column is of SYMBOL or STRING type indicating the key of the dictionary specified in the inputTables;

    • The third column must be of BLOB type that stores the serialized result of each replayed record.

    • In addition, you can output the columns with the same column names and data types in the input tables.

dateColumn / timeColumn is the column name of the temporal column. At least one of them must be specified.

  • In one-to-one replay or homogeneous replay: The time columns in the inputTables and outputTables must use the same name and the parameter is a string scalar.

  • In N-to-N replay: The time columns in the inputTables and outputTables use differentnames and the parameter is a string vector.

  • In N-to-one replay: The time columns in the inputTables and outputTables use different names and the parameter is a dictionary. The key of the dictionary is a user-defined string indicating the unique identifier of the input table, and the value is dateColumn/timeColumn.

If dateColumn and timeColum are specified as the same column or only one of them is specified, there is no restriction on the type of the specified temporal column.

If dateColumn and timeColum are specified as different columns, dateColumn must be DATE and timeColum can only be SECOND, TIME or NANOTIME.

replayRate is an integer. Together with the parameter absoluteRate it determines the speed of replaying data.

absoluteRate is a Boolean value. The default value is true.

  • If replayRate is a positive integer and absoluteRate = true, replay the specified replayRate records per second. Assuming that the number of records of the inputTables is “total” (which takes the maximum number of records in a table when replaying multiple tables), then the elapsed time of replay is roughly equal to total/replayRate (in seconds).

  • If replayRate is a positive integer, and absoluteRate = false, replay at replayRate times the original speed of the data. For example, if the difference between the maximum and the minimum values of dateColumn or timeColum is n seconds and the number of records is s, then the system replays at the speed of replayRate * s / n records per second (at least 1 record per second), i.e., it takes n/replayRate seconds to finish the replay.

  • If replayRate is negative or unspecified, replay at the maximum speed. The elapsed time for replay is related to the performance of the DolphinDB server.

The elapsed time of above replay modes is estimated based on homogeneous replay, whereas the elapsed time of heterogeneous replay will be slightly longer.

parallelLevel is a positive integer indicating the number of threads loading data into memory from data sources simultaneously. The default value is 1. If inputTables is not a data source, there is no need to specify.

Examples

Ex 1. Replay a table:

$ n=1000
$ sym = take(`IBM,n)
$ timestamp= take(temporalAdd(2012.12.06T09:30:12.000,1..500,'s'),n)
$ volume = rand(100,n)
$ trades=table(sym,timestamp,volume)
$ trades.sortBy!(`timestamp)
$ share streamTable(100:0,`sym`timestamp`volume,[SYMBOL,TIMESTAMP,INT]) as st

Replay 100 records per second:

$ timer replay(inputTables=trades, outputTables=st, dateColumn=`timestamp, timeColumn=`timestamp,replayRate=100, absoluteRate=true);
Time elapsed: 10001.195 ms

Replay at 100 times of the original speed of the data:

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp,replayRate=100,absoluteRate=false);
Time elapsed: 5001.909 ms

Replay at the maximum speed:

$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp);
Time elapsed: 2.026 ms

Ex 2. Replay multiple data sources to one or more output tables.

$ n=5000000
$ sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
$ date=take(2012.06.12..2012.06.16,n)
$ time=rand(13:30:11.008..13:30:11.012,n)
$ volume = rand(100,n)
$ t=table(sym,date,time,volume)
$ if(existsDatabase("dfs://test_stock")){
$ dropDatabase("dfs://test_stock")
$ }
$ db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16)
$ trades=db.createPartitionedTable(t,`trades,`date)
$ trades.append!(t)

$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st1
$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st2

$ ds1=replayDS(sqlObj=<select * from trades where date=2012.06.12>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])
$ ds2=replayDS(sqlObj=<select * from trades where date=2012.06.16>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])

$ replay(inputTables=[ds1,ds2],outputTables=[st1,st2],dateColumn=`date,timeColumn=`time);       //Replay ds1 and ds2 to st1 and st2 respectively.
$ select * from st1;

sym

date

time

price

GOOG

2012.06.12

13:30:11.008

80.257105

MSFT

2012.06.12

13:30:11.008

19.509125

MSFT

2012.06.12

13:30:11.008

16.856134

GS

2012.06.12

13:30:11.008

77.784879

MSFT

2012.06.12

13:30:11.008

33.182977

GOOG

2012.06.12

13:30:11.008

0.411022

IBM

2012.06.12

13:30:11.008

64.201467

GS

2012.06.12

13:30:11.008

2.961122

GOOG

2012.06.12

13:30:11.008

19.06283

IBM

2012.06.12

13:30:11.008

9.793819

$ select * from st2;

sym

date

time

price

GS

2012.06.16

13:30:11.008

65.34449

GS

2012.06.16

13:30:11.008

69.313035

GOOG

2012.06.16

13:30:11.008

83.343602

IBM

2012.06.16

13:30:11.008

1.09428

APPL

2012.06.16

13:30:11.008

37.420217

APPL

2012.06.16

13:30:11.008

29.142734

IBM

2012.06.16

13:30:11.008

81.364092

GS

2012.06.16

13:30:11.008

94.248142

GS

2012.06.16

13:30:11.008

35.037498

GS

2012.06.16

13:30:11.008

19.514272

$ share streamTable(200:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st3
$ replay(inputTables=[ds1,ds2],outputTables=st3,dateColumn=`date,timeColumn=`time);     //Replay both ds1 and ds2 to st3.
$ select count(*) from st3
2000000

Ex 3. heterogeneous replay

$ n=1000
$ sym = take(take(`IBM,n).join(take(`GS,n)), n)
$ myDate=take(2021.01.02..2021.01.06, n).sort!()
$ myTime=take(09:30:00..15:59:59,n)
$ vol = rand(100, n)
$ t=table(sym,myDate,myTime,vol)


$ n=1000
$ sym = take(take(`IBM,n).join(take(`GS,n)), n)
$ date=take(2021.01.02..2021.01.06, n).sort!()
$ time=take(09:30:00..15:59:59,n)
$ vol = rand(100, n)
$ t1=table(sym, date,time,vol)

$ if(existsDatabase("dfs://test_stock")){
$    dropDatabase("dfs://test_stock")
$  }
$ db1=database("",RANGE, 2021.01.01..2021.01.12)
$ db2=database("",VALUE,`IBM`GS)
$ db=database("dfs://test_stock",COMPO,[db1, db2])
$ trades=db.createPartitionedTable(t,`trades,`myDate`sym)
$ trades.append!(t);
$ trades1=db.createPartitionedTable(t1,`trades1,`date`sym)
$ trades1.append!(t1);

//get the data source

$ ds = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`myDate, timeColumn=`myTime)
$ ds.size();

$ ds1 = replayDS(sqlObj=<select * from loadTable(db, `trades1)>, dateColumn=`date, timeColumn=`time)
$ ds1.size();

$ input_dict  = dict(["msg1", "msg2"], [ds, ds1])
$ date_dict = dict(["msg1", "msg2"], [`myDate, `date])
$ time_dict = dict(["msg1", "msg2"], [`myTime, `time])
$ opt = streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT])
$ replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict,  replayRate=1000, absoluteRate=false);
$ select top 10 * from opt;