replay
Syntax
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])
Details
Replay one or more tables or data sources to a table in chronological order to simulate real-time ingestion of streaming data. To replay historical data, use function replayDS to generate a data source as an input of replay.
There are 3 mapping types between the inputTables and outputTables: one-to-one, N-to-one, and N-to-N. For N-to-one replay, before version 1.30.17 / 2.00.5, the schemata of the input tables must be the same, which is called homogeneous replay. Starting from version 1.30.17/2.00.5, replay supports inputting tables with different schemata in the format of a dictionary, which is called heterogeneous replay. In heterogeneous replay, the output table can be deserialized and data can be filtered and processed by streamFilter
.
Note: In N-to-one replay, if inputTables are data sources, the size of each data source and the time range of dateColumn must be consistent.
To terminate the replay, use commands cancelJob or cancelConsoleJob.
Arguments
inputTables are non-partitioned in-memory tables or data sources generated by function replayDS. It is
a scalar or tuple for one or more table objects or data sources with the same schema.
a dictionary for multiple table objects or data sources with different schemata. The key of the dictionary is a user-defined string indicating the unique identifier of the input table, and the value is the table object or data source.
outputTables are in-memory tables or stream tables. If outputTables are shared tables, they can also be specified as sharedNames.
In one-to-one replay or homogeneous replay: the outputTables is a table object or string scalar with the same schema of the input table.
In N-to-N replay: the outputTables is a string vector or tuple of table objects with the same length as that of inputTables. The outputTables and inputTables are mapped one-to-one, and each pair has the same schema.
In heterogeneous replay: the outputTables is a table containing at least three columns:
The first column is of TIMESTAMP type indicating the timestamp specified by dateColumn/timeColumn;
The second column is of SYMBOL or STRING type indicating the key of the dictionary specified in the inputTables;
The third column must be of BLOB type that stores the serialized result of each replayed record.
In addition, you can output the columns with the same column names and data types in the input tables.
dateColumn / timeColumn is the column name of the temporal column. At least one of them must be specified.
In one-to-one replay or homogeneous replay: The time columns in the inputTables and outputTables must use the same name and the parameter is a string scalar.
In N-to-N replay: The time columns in the inputTables and outputTables use differentnames and the parameter is a string vector.
In N-to-one replay: The time columns in the inputTables and outputTables use different names and the parameter is a dictionary. The key of the dictionary is a user-defined string indicating the unique identifier of the input table, and the value is dateColumn/timeColumn.
If dateColumn and timeColum are specified as the same column or only one of them is specified, there is no restriction on the type of the specified temporal column.
If dateColumn and timeColum are specified as different columns, dateColumn must be DATE and timeColum can only be SECOND, TIME or NANOTIME.
replayRate is an integer. Together with the parameter absoluteRate it determines the speed of replaying data.
absoluteRate is a Boolean value. The default value is true.
If replayRate is a positive integer and absoluteRate = true, replay the specified replayRate records per second. Assuming that the number of records of the inputTables is “total” (which takes the maximum number of records in a table when replaying multiple tables), then the elapsed time of replay is roughly equal to total/replayRate (in seconds).
If replayRate is a positive integer, and absoluteRate = false, replay at replayRate times the original speed of the data. For example, if the difference between the maximum and the minimum values of dateColumn or timeColum is n seconds and the number of records is s, then the system replays at the speed of replayRate * s / n records per second (at least 1 record per second), i.e., it takes n/replayRate seconds to finish the replay.
If replayRate is negative or unspecified, replay at the maximum speed. The elapsed time for replay is related to the performance of the DolphinDB server.
The elapsed time of above replay modes is estimated based on homogeneous replay, whereas the elapsed time of heterogeneous replay will be slightly longer.
parallelLevel is a positive integer indicating the number of threads loading data into memory from data sources simultaneously. The default value is 1. If inputTables is not a data source, there is no need to specify.
Examples
Ex 1. Replay a table:
$ n=1000
$ sym = take(`IBM,n)
$ timestamp= take(temporalAdd(2012.12.06T09:30:12.000,1..500,'s'),n)
$ volume = rand(100,n)
$ trades=table(sym,timestamp,volume)
$ trades.sortBy!(`timestamp)
$ share streamTable(100:0,`sym`timestamp`volume,[SYMBOL,TIMESTAMP,INT]) as st
Replay 100 records per second:
$ timer replay(inputTables=trades, outputTables=st, dateColumn=`timestamp, timeColumn=`timestamp,replayRate=100, absoluteRate=true);
Time elapsed: 10001.195 ms
Replay at 100 times of the original speed of the data:
$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp,replayRate=100,absoluteRate=false);
Time elapsed: 5001.909 ms
Replay at the maximum speed:
$ timer replay(inputTables=trades,outputTables=st,dateColumn=`timestamp,timeColumn=`timestamp);
Time elapsed: 2.026 ms
Ex 2. Replay multiple data sources to one or more output tables.
$ n=5000000
$ sym = rand(symbol(`IBM`APPL`MSFT`GOOG`GS),n)
$ date=take(2012.06.12..2012.06.16,n)
$ time=rand(13:30:11.008..13:30:11.012,n)
$ volume = rand(100,n)
$ t=table(sym,date,time,volume)
$ if(existsDatabase("dfs://test_stock")){
$ dropDatabase("dfs://test_stock")
$ }
$ db=database("dfs://test_stock",VALUE,2012.06.12..2012.06.16)
$ trades=db.createPartitionedTable(t,`trades,`date)
$ trades.append!(t)
$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st1
$ share streamTable(100:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st2
$ ds1=replayDS(sqlObj=<select * from trades where date=2012.06.12>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])
$ ds2=replayDS(sqlObj=<select * from trades where date=2012.06.16>,dateColumn=`date,timeColumn=`time,timeRepartitionSchema=[13:30:11.008,13:30:11.010,13:30:11.013])
$ replay(inputTables=[ds1,ds2],outputTables=[st1,st2],dateColumn=`date,timeColumn=`time); //Replay ds1 and ds2 to st1 and st2 respectively.
$ select * from st1;
sym |
date |
time |
price |
---|---|---|---|
GOOG |
2012.06.12 |
13:30:11.008 |
80.257105 |
MSFT |
2012.06.12 |
13:30:11.008 |
19.509125 |
MSFT |
2012.06.12 |
13:30:11.008 |
16.856134 |
GS |
2012.06.12 |
13:30:11.008 |
77.784879 |
MSFT |
2012.06.12 |
13:30:11.008 |
33.182977 |
GOOG |
2012.06.12 |
13:30:11.008 |
0.411022 |
IBM |
2012.06.12 |
13:30:11.008 |
64.201467 |
GS |
2012.06.12 |
13:30:11.008 |
2.961122 |
GOOG |
2012.06.12 |
13:30:11.008 |
19.06283 |
IBM |
2012.06.12 |
13:30:11.008 |
9.793819 |
… |
$ select * from st2;
sym |
date |
time |
price |
---|---|---|---|
GS |
2012.06.16 |
13:30:11.008 |
65.34449 |
GS |
2012.06.16 |
13:30:11.008 |
69.313035 |
GOOG |
2012.06.16 |
13:30:11.008 |
83.343602 |
IBM |
2012.06.16 |
13:30:11.008 |
1.09428 |
APPL |
2012.06.16 |
13:30:11.008 |
37.420217 |
APPL |
2012.06.16 |
13:30:11.008 |
29.142734 |
IBM |
2012.06.16 |
13:30:11.008 |
81.364092 |
GS |
2012.06.16 |
13:30:11.008 |
94.248142 |
GS |
2012.06.16 |
13:30:11.008 |
35.037498 |
GS |
2012.06.16 |
13:30:11.008 |
19.514272 |
… |
$ share streamTable(200:0,`sym`date`time`volume,[SYMBOL,DATE,TIME,INT]) as st3
$ replay(inputTables=[ds1,ds2],outputTables=st3,dateColumn=`date,timeColumn=`time); //Replay both ds1 and ds2 to st3.
$ select count(*) from st3
2000000
Ex 3. heterogeneous replay
$ n=1000
$ sym = take(take(`IBM,n).join(take(`GS,n)), n)
$ myDate=take(2021.01.02..2021.01.06, n).sort!()
$ myTime=take(09:30:00..15:59:59,n)
$ vol = rand(100, n)
$ t=table(sym,myDate,myTime,vol)
$ n=1000
$ sym = take(take(`IBM,n).join(take(`GS,n)), n)
$ date=take(2021.01.02..2021.01.06, n).sort!()
$ time=take(09:30:00..15:59:59,n)
$ vol = rand(100, n)
$ t1=table(sym, date,time,vol)
$ if(existsDatabase("dfs://test_stock")){
$ dropDatabase("dfs://test_stock")
$ }
$ db1=database("",RANGE, 2021.01.01..2021.01.12)
$ db2=database("",VALUE,`IBM`GS)
$ db=database("dfs://test_stock",COMPO,[db1, db2])
$ trades=db.createPartitionedTable(t,`trades,`myDate`sym)
$ trades.append!(t);
$ trades1=db.createPartitionedTable(t1,`trades1,`date`sym)
$ trades1.append!(t1);
//get the data source
$ ds = replayDS(sqlObj=<select * from loadTable(db, `trades)>, dateColumn=`myDate, timeColumn=`myTime)
$ ds.size();
$ ds1 = replayDS(sqlObj=<select * from loadTable(db, `trades1)>, dateColumn=`date, timeColumn=`time)
$ ds1.size();
$ input_dict = dict(["msg1", "msg2"], [ds, ds1])
$ date_dict = dict(["msg1", "msg2"], [`myDate, `date])
$ time_dict = dict(["msg1", "msg2"], [`myTime, `time])
$ opt = streamTable(100:0,`timestamp`sym`blob`vol, [DATETIME,SYMBOL, BLOB, INT])
$ replay(inputTables=input_dict, outputTables=opt, dateColumn = date_dict, timeColumn=time_dict, replayRate=1000, absoluteRate=false);
$ select top 10 * from opt;