imr

Syntax

imr(ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [carryover=false])

Arguments

ds the list of data sources. It must be a tuple with each element as a data source object. Even if there is only one data source, we still need a tuple to wrap the data source. In iterative computing, data sources are automatically cached and the cache will be cleared after the last iteration.

initValue the initial values of model parameter estimates. The format of the initial values must be the same as the output of the final function.

mapFunc the map function. It has 2 or 3 arguments. The first argument is the data entity represented by the corresponding data source. The second argument is the output of the final function in the previous iteration, which is an updated estimate of the model parameter. For the first iteration, it is the initial values given by the user. The last argument is the carryover object. Please check the explanation for parameter carryover for details.

reduceFunc the binary reduce function combines two map function call results. If there are M map calls, the reduce function will be called M-1 times. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional.

finalFunc the final function in each iteration. It accepts two arguments. The first argument is the output of the final function in the previous iteration. For the first iteration, it is the initial values given by the user. The second argument is the output of the reduce function call. If the reduce function is not specified, a tuple representing the collection of individual map call results would be the second argument.

terminateFunc either a function that determines if the computation would continue, or a specified number of iterations. The termination function accepts two parameters. The first is the output of the reduce function in the previous iteration and the second is the output of the reduce function in the current iteration. If the function returns a true value, the iterations will end.

carryover a Boolean value indicating whether a map function call produces a carryover object to be passed to the next iteration of the map function call. The default value is false. If it is set to true, the map function has 3 arguments and the last argument is the carryover object, and the map function output is a tuple whose last element is the carryover object. In the first iteration, the carryover object is the NULL object.

Details

DolphinDB offers function imr for iterative computing based on the map-reduce methodology. Each iteration uses the result from the previous iteration and the input dataset. The input dataset for each iteration is unchanged so that it can be cached. Iterative computing requires initial values for the model parameters and a termination criterion.

Examples

The following is an example of distributed median calculation. The data are distributed on multiple nodes and we would like to calculate the median of a variable. First, for each data source, put the data into buckets and use the map function to count the number of data points in each bucket. Then use the reduce function to merge the bucket counts from multiple data sources. Locate the bucket that contains the median. In the next iteration, the chosen bucket is divided into smaller buckets. The iterations will finish when the size of the chosen bucket is no more than the specified number.

def medMap(data, range, colName){
   return bucketCount(data[colName], double(range), 1024, true)
}

def medFinal(range, result){
   x= result.cumsum()
   index = x.asof(x[1025]/2.0)
   ranges = range[1] - range[0]
   if(index == -1)
      return (range[0] - ranges*32):range[1]
   else if(index == 1024)
      return range[0]:(range[1] + ranges*32)
   else{
      interval = ranges / 1024.0
      startValue = range[0] + (index - 1) * interval
      return startValue : (startValue + interval)
   }
}


def medEx(ds, colName, range, precision){
   termFunc = def(prev, cur): cur[1] - cur[0] <= precision
   return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()
}