appendMsg

Syntax

appendMsg(engine, msgBody, msgId)

Arguments

engine is a built-in streaming engine, i.e., the abstract table object return by functions such as createReactiveStateEngine

msgBody is the messages to be ingested into the streaming engine.

msgId is the ID of the last message that has been ingested into the streaming engine. The ID starts from the beginning of the subscription.

Details

If snapshot is enabled and RaftGroup is disabled for a streaming engine, the handler parameter of function subscribeTable must be appendMsg to inject the data into the engine.

Examples

$ share streamTable(10000:0,`time`sym`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
$ output1 =table(10000:0, `time`sym`avgPrice, [TIMESTAMP,SYMBOL,DOUBLE]);

$ engine1 = createTimeSeriesEngine(name=`engine1, windowSize=100, step=50, metrics=<avg(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, keyColumn=`sym, snapshotDir="C:/DolphinDB/Data/snapshotDir", snapshotIntervalInMsgCount=100)
$ subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=appendMsg{engine1}, msgAsTable=true, handlerNeedMsgId=true)

$ n=500
$ timev=2021.03.12T15:00:00.000 + (1..n join 1..n)
$ symv = take(`A, n) join take(`B, n)
$ pricev = (100+cumsum(rand(1.0,n)-0.5)) join (200+cumsum(rand(1.0,n)-0.5))
$ t=table(timev as time, symv as sym, pricev as price).sortBy!(`time)
$ trades.append!(t)

$ select * from output1