Batch Job Management

The execution of some jobs may be very time consuming. DolphinDB provides a scheme to run these jobs (called batch jobs) in a separate worker pool that is isolated from regular interactive jobs such as adhoc queries. The maximum number of batch job workers is set by the configuration parameter maxBatchJobWorker. If the number of batch jobs exceeds the limit, the incoming batch jobs will enter a queue to wait. The batch job worker will be destroyed automatically after being idle for over 60 seconds.

To manage the output of batch jobs, DolphinDB creates a directory specified by the configuration parameter batchJobDir. If it is not specified, the default directory would be <HomeDir>/batchJobs. Each batch job generates 2 files: <job_id>.msg and <job_id>.obj that store intermediate messages and return objects, respectively. In addition, each batch job adds 3 entries to the batch job log <BatchJobDir>/batchJob.log when the system receives, starts, and completes the batch job.

Related functions:

  • submitJob (jobId, jobDesc, jobDef, args…)

  • submitJobEx (jobId, jobDesc, priority, parallelism, jobDef, args…)

Submit the batch job to the local node and return the job ID for future reference. To submit a batch job to a remote node, use rpc , remoteRun or remoteRunWithCompression . jobDesc is optional. If it is not specified, the function name will be used as the job description.

Retrieve the status of the batch job.

Retrieve the status of recent N jobs on the local node.

Retrieve the intermediate messages from the batch job.

Retrieve the batch job result.

Cancel a submitted but unfinished batch job.

Examples:

  • submit a job to the local node:

$ def job1(n){
$     s = 0
$     for (x in 1 : n) {
$         s += sum(sin rand(1.0, 100000000)-0.5)
$         print("iteration " + x + " " + s)
$     }
$     return s
$ };
$ job1_ID=submitJob("job1_ID","", job1, 100);

$ getJobStatus(job1_ID);

node

userID

jobId

jobDesc

receivedTime

startTime

endTime

errorMsg

local8848

root

job1_ID

job1

2018.06.16T10:44:34.066

2018.06.16T10:44:34.066

endTime is empty. This means the job is still running. After the job is completed, rerun getJobStatus :

$ getJobStatus(job1_ID);

node

userID

jobId

jobDesc

receivedTime

startTime

endTime

errorMsg

local8848

root

job1_ID

job1

2018.06.16T10:44:34.066

2018.06.16T10:44:34.066

2018.06.16T10:46:10.389

$ getJobMessage(job1_ID);
$ 2018-06-16 10:44:34.066064 Start the job [job1_ID]: job1
$ 2018-06-16 10:44:35.377095 iteration 1 1412.431451
$ 2018-06-16 10:44:36.624907 iteration 2 2328.917258
$ 2018-06-16 10:44:37.577107 iteration 3 5491.651822
$ 2018-06-16 10:44:38.530187 iteration 4 6332.907608
$ 2018-06-16 10:44:39.488295 iteration 5 8404.393113
$ ......
$ 2018-06-16 10:46:06.655519 iteration 95 -13124.624482
$ 2018-06-16 10:46:07.562855 iteration 96 -14878.269863
$ 2018-06-16 10:46:08.520555 iteration 97 -9842.451424
$ 2018-06-16 10:46:09.456576 iteration 98 -8149.660675
$ 2018-06-16 10:46:10.373218 iteration 99 -10639.329897
$ 2018-06-16 10:46:10.389147 The job is done.

$ getJobReturn(job1_ID);
$ -4291.91147
  • submit a job to a remote node:

With function rpc (“DFS_NODE2” is located in the same cluster as the local node):

$ def jobDemo(n){
$     s = 0
$     for (x in 1 : n) {
$         s += sum(sin rand(1.0, 100000000)-0.5)
$         print("iteration " + x + " " + s)
$     }
$     return s
$ };

$ rpc("DFS_NODE2", submitJob, "job1_1", "", job1{10});

$ rpc("DFS_NODE2", getJobReturn, "jobDemo3")
$ Output: -3426.577521

With function remoteRun:

$ conn = xdb("DFS_NODE2")
$ conn.remoteRun(submitJob, "job1_2", "", job1, 10);

$ conn.remoteRun(getJobReturn, "job1_2");
$ Output: 4238.832005