subscribeTable

Details

Subscribe to a stream table on a local or remote server from a client node. We can also specify a function to process the subscribed data.

Return the subscription topic, which is a combination of the alias of the node where the stream table is located, stream table name, and the subscription task name (if actionName is specified) separated by “_”. If the subscription topic already exists, the function throws an exception.

  • If batchSize is specified, handler will be triggered if either the number of unprocessed messages reaches batchSize or the duration of time since the last time handler was triggered reaches throttle seconds.

  • If the subscribed table is overwritten, to keep the subscription we need to cancel the subscription with command unsubscribeTable and then subscribe to the new table.

  • With the high availability subscription enabled, a leader switch or cluster restart in the raft group of subscribers may log off a user. Since guests have no privilege to write to a DFS table, the writing will be interrupted (the current user can be identified by function getCurrentSessionAndUser). If parameters userId and password are specified, the system will attempt to log in after the user is logged out accidentally to make sure the subscribed data can be written to a DFS table.

Please note that the parameter handler of function subscribeTable should be function appendForJoin, getLeftStream or getRightStream while subscribing to a stream table for the streaming join engine.

Syntax

subscribeTable([server],tableName,[actionName],[offset=-1],handler,[msgAsTable=false],[batchSize=0],[throttle=1],[hash=-1],[reconnect=false],[filter],[persistOffset=false],[timeTrigger=false],[handlerNeedMsgId=false], [raftGroup])

Arguments

Only tableName and handler are required. All the other parameters are optional.

server a string indicating the alias or the remote connection handle of a server where the stream table is located. If it is unspecified or an empty string (“”), it means the local instance.

tableName a string indicating the name of the shared stream table on the aforementioned server.

actionName a string indicating subscription task name. It can have letters, digits and underscores. It must be specified if multiple subscriptions on the same node subscribe to the same stream table.

offset an integer indicating the position of the first message where the subscription begins. A message is a row of the stream table. Offset is relative to the first row of the stream table when it is created. If offset is unspecified or -1, the subscription starts with the next new message. If offset=-2, the system will get the persisted offset on disk and start subscription from there. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.

handler is a unary function, binary function or a table, which is used to process the subscribed data.

  • If handler is a unary function, the only parameter of the function is the subscribed data, which can be a table or a tuple of the subscribed table columns.

  • The handler must be specified as a binary function when handlerNeedMsgId = true. The parameters of the function are msgBody and msgId. For more details, see parameter handlerNeedMsgId.

  • If handler is a table, and the subscribed data is inserted into the table directly. It supports the streaming engine, shared table (including stream table, in-memory table, keyed table, indexed table), and DFS table.

msgAsTable a Boolean value indicating whether the subscribed data is ingested into handler as a table or as a tuple. If msgAsTable=true, the subscribed data is ingested into handler as a table and can be processed with SQL statements. The default value is false, which means the subscribed data is ingested into handler as a tuple of columns.

batchSize an integer indicating the number of unprocessed messages to trigger the handler. If it is positive, the handler does not process messages until the number of unprocessed messages reaches batchSize. If it is unspecified or non-positive, the handler processes incoming messages as soon as they come in.

throttle is a floating point number in seconds, indicating the maximum waiting time before the handler processes the incoming messages if the batchSize condition has not been reached. The default value is 1. This optional parameter has no effect if batchSize is not specified. To set If throttle is less than 1 second, you need to modify the configuration parameter subThrottle first.

hash a hash value (a nonnegative integer) indicating which subscription executor will process the incoming messages for this subscription. If it is unspecified, the system automatically assigns an executor. When we need to keep the messages synchronized from multiple subscriptions, we can set hash of all these subscriptions to be the same, so that the same executor is used to synchronize multiple data sources.

reconnect a Boolean value indicating whether the subscription may be automatically resumed successfully if it is interrupted. With a successfully resubscription, the subscriber receives all streaming data since the interruption. The default value is false. If reconnect=true, depending on how the subscription is interrupted, we have the following 3 scenarios:

  • If the network is disconnected while both the publisher and the subscriber node remain on, the subscription will be automatically reconnected if the network connection is resumed.

  • If the publisher node crashes, the subscriber node will keep attempting to resubscribe after the publisher node restarts.

    • f the publisher node adopts data persistence mode for the stream table, the publisher will first load persisted data into memory after restarting. The subscriber won’t be able to successfully resubscribe until the publisher reaches the row of data where the subscription was interrupted.

    • If the publisher node does not adopt data persistence mode for the stream table, the automatic resubscription will fail.

  • If the subscriber node crashes, even after the subscriber node restarts, it won’t automatically resubscribe. For this case we need to execute subscribeTable again.

Note: To subscribe to a high-availability stream table, the parameter reconnect should be set to true to ensure that the new leader node can be successfully connected to in case of a leader node change. When both the publisher and the subscriber nodes are relatively stable with routine tasks, such as in IoT applications, we recommend setting reconnect=true. On the other hand, if the subscriber node is tasked with complicated queries with large amounts of data, we recommend setting reconnect=false.

filter selected values in the filtering column. The filtering column is set with function setStreamTableFilterColumn. Only the messages with filtering column values in filter are subscribed. filter does not support Boolean types.

The filter parameter can be specified in the following 3 ways.

  • value filtering: a vector.

  • range filtering: a pair. The range includes the lower bound but excludes the upper bound.

  • hash filtering: a tuple. The first element is the number of buckets. The second element is a scalar meaning the bucket index (starting from 0) or a pair meaning the bucket index range (including the lower bound but excluding the upper bound).

persistOffset a Boolean value indicating whether to persist the offset of the last processed message in the current subscription. It is used for resubscription and can be obtained with function getTopicProcessedOffset. The default value is false.

Note:

  • To subscribe to a high-availability stream table, the parameter persistOffset should be set to true to prevent data loss on the subscriber.

  • To resubscribe from the persisted offset, set persistOffset to true and removeOffset of function unsubscribeTable to false.

timeTrigger a Boolean value. If it is set to true, handler is triggered at the intervals specified by parameter throttle even if no new messages arrive. The default value is false.

handlerNeedMsgId is a Boolean value. The default value is false.

  • If it is true, the parameter handler must support 2 parameters: msgBody (the messages to be ingested into the streaming engine) and msgId (the ID of the last message that has been ingested into the streaming engine). You can pass the appendMsg function to the handler after fixing the engine parameter using partial application.

  • If it is false, handler must support just 1 parameter: msgBody.

raftGroup the ID of raft group used to enable high availability on the subscriber. If the leader node of a raft group is changed after setting the parameter raftGroup, the new leader will resubscribe to the stream table.

Note: The function subscribeTable can only be executed on a leader node if the parameter raftGroup is set. If handlerNeedMsgId is also set to true while raftGroup = true, then the handler must be the handler of a stream engine (you can obtain the handler when you create the engine or obtain it with the function “getStreamEngine(engineName)”.

userId a string indicating a user name.

password a string indicating the password.

Examples

The following is an example about streaming. A cluster has 2 nodes: DFS_NODE1 and DFS_NODE2. We need to specify maxPubConnections and subPort in cluster.cfg to enable the publish/subscribe functionality. For example:

$ maxPubConnections=32
$ DFS_NODE1.subPort=9010
$ DFS_NODE1.persistenceDir=C:/DolphinDB/Data
$ DFS_NODE2.subPort=9011

Execute the following script on DFS_NODE1 for the following tasks:

1. Create a shared stream table trades_stream with persistence in synchronous mode. At this stage table trades_stream has 0 rows.

$ n=20000000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="trades_stream", asynWrite=false, cacheSize=n)
$ go

2. Create a distributed table trades. At this stage table trades has 0 rows.

$ if(existsDatabase("dfs://STREAM_TEST")){
$      dropDatabase("dfs://STREAM_TEST")
$ }
$ dbDate = database("", VALUE, temporalAdd(date(today()),0..30,'d'))
$ dbSym= database("", RANGE, string('A'..'Z') join "ZZZZ")
$ db = database("dfs://STREAM_TEST", COMPO, [dbDate, dbSym])
$ colNames = `date`time`sym`qty`price
$ colTypes = [DATE,TIME,SYMBOL,INT,DOUBLE]
$ trades = db.createPartitionedTable(table(1:0, colNames, colTypes), "trades", `date`sym)

3. Create a local subscription to table trades_stream. Use a function saveTradesToDFS to save streaming data from trades_stream and today’s date to table trades.

$ def saveTradesToDFS(mutable dfsTrades, msg): dfsTrades.append!(select today() as date,* from msg)
$ subscribeTable(tableName="trades_stream", actionName="trades", offset=0, handler=saveTradesToDFS{trades}, msgAsTable=true, batchSize=100000, throttle=60)

4. Create another local subscription to table trades_stream. Calculate volume-weighted average price (vwap) with streaming data for each minute and save the result to a shared stream table vwap_stream with persistence in asynchronous mode.

$ n=1000000
$ tmpTrades = table(n:0, colNames, colTypes)
$ lastMinute = [00:00:00.000]
$ colNames = `time`sym`vwap
$ colTypes = [MINUTE,SYMBOL,DOUBLE]
$ enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="vwap_stream")
$ go

$ def calcVwap(mutable vwap, mutable tmpTrades, mutable lastMinute, msg){
$     tmpTrades.append!(msg)
$     curMinute = time(msg.time.last().minute()*60000l)
$     t = select wavg(price, qty) as vwap from tmpTrades where time < curMinute, time >= lastMinute[0] group by time.minute(), sym
$     if(t.size() == 0) return
$     vwap.append!(t)
$     t = select * from tmpTrades where time >= curMinute
$     tmpTrades.clear!()
$     lastMinute[0] = curMinute
$     if(t.size() > 0) tmpTrades.append!(t)
$ }
$ subscribeTable(tableName="trades_stream", actionName="vwap", offset=0, handler=calcVwap{vwap_stream, tmpTrades, lastMinute}, msgAsTable=true, batchSize=100000, throttle=60)

Execute the following script on DFS_NODE2 to create a remote subscription to table trades_stream. This subscription saves streaming data to a shared stream table trades_stream_slave with persistence in asynchronous mode.

$ n=20000000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="trades_stream_slave", cacheSize=n)
$ go
$ subscribeTable(server="DFS_NODE1", tableName="trades_stream", actionName="slave", offset=0, handler=trades_stream_slave)

Execute the following script on DFS_NODE1 to simulate the streaming data for 3 stocks and 10 minutes. Generate 2,000,000 records for each stock in each minute. Data in one minute are inserted into the stream table trades_stream in 600 blocks. There is a 100 millisecond interval between 2 blocks.

$ n=10
$ ticks = 2000000
$ rows = ticks*3
$ startMinute = 09:30:00.000
$ blocks=600
$ for(x in 0:n){
$     time = startMinute + x*60000 + rand(60000, rows)
$     indices = isort(time)
$     time = time[indices]
$     sym = array(SYMBOL,0,rows).append!(take(`IBM,ticks)).append!(take(`MSFT,ticks)).append!(take(`GOOG,ticks))[indices]
$     price = array(DOUBLE,0,rows).append!(norm(153,1,ticks)).append!(norm(91,1,ticks)).append!(norm(1106,20,ticks))[indices]
$     indices = NULL
$     blockSize = rows / blocks
$     for(y in 0:blocks){
$         range =pair(y * blockSize, (y+1)* blockSize)
$         insert into trades_stream values(subarray(time,range), subarray(sym,range), 10+ rand(100, blockSize), subarray(price,range))
$         sleep(100)
$     }

$     blockSize = rows % blocks
$     if(blockSize > 0){
$         range =pair(rows - blockSize, rows)
$         insert into trades_stream values(subarray(time,range), subarray(sym,range), 10+ rand(100, blockSize), subarray(price,range))
$     }
$ }

To check the results, run the following script on DFS_NODE1:

$ trades=loadTable("dfs://STREAM_TEST", `trades)
$ select count(*) from trades

We expect to see a result of 60,000,000

$ select * from vwap_stream

We expect the table vwap_stream has 27 rows.

Run the following script on DFS_NODE2:

$ select count(*) from trades_stream_slave

We expect to see a result of less than 60,000,000 as part of the table has been persisted to disk.