mr

Syntax

mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true])

Arguments

ds the list of data sources. This required parameter must be a tuple and each element of the tuple is a data source object. Even if there is only one data source, we still need a tuple to wrap the data source.

mapFunc the map function. It accepts one and only one argument, which is the materialized data entity from a data source. If we would like the map function to accept more parameters in addition to the materialized data source, we can use a Partial Application to convert a multiple-parameter function to a unary function. The number of map function calls is the same as the number of data sources. The map function returns a regular object (scalar, pair, array, matrix, table, set, or dictionary) or a tuple (containing multiple regular objects).

reduceFunc the binary reduce function that combines two map function call results. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional. If the reduce function is not specified, the system returns all individual map call results to the final function.

finalFunc the final function accepts one and only one parameter. The output of the last reduce function call is the input of the final function. The final function is optional. If it is not specified, the system returns the individual map function call results.

parallel an optional boolean flag indicating whether to execute the map function in parallel locally. The default value is true, i.e., enabling parallel computing. When there is very limited available memory and each map call needs a large amount of memory, we can disable parallel computing to prevent the out-of-memory problem. We may also want to disable the parallel option in other scenarios. For example, we may need to disable the parallel option to prevent multiple threads from writing to the same partition simultaneously.

Details

The Map-Reduce function is the core function of DolphinDB’s generic distributed computing framework.

Examples

The following is an example of distributed linear regression. Suppose X is the matrix of independent variables and y is the dependent variable. X and y are stored in multiple data sources. To estimate the ordinary least square parameters, we need to calculate X TX and X Ty. We can calculate the tuple of (X TX, X Ty) from each data source, then aggregate the results from all data sources to get X TX and X Ty for the entire dataset.

def myOLSMap(table, yColName, xColNames, intercept){
    if(intercept)
        x = matrix(take(1.0, table.rows()), table[xColNames])
    else
        x = matrix(table[xColNames])
    xt = x.transpose();
    return xt.dot(x), xt.dot(table[yColName])
}

def myOLSFinal(result){
    xtx = result[0]
    xty = result[1]
    return xtx.inv().dot(xty)[0]
}

def myOLSEx(ds, yColName, xColNames, intercept){
    return mr(ds, myOLSMap{, yColName, xColName, intercept}, +, myOLSFinal)
}

In the example above, we define the map function and final function. In practice, we may define transformation functions for data sources as well. These functions only need to be defined in the local instance. Users don’t need to compile them or deploy them to the remote instances. The distributed computing framework in DolphinDB handles these complicated issues for end users on the fly.

As a frequently used analytics tool, the distributed ordinary least square linear regression is implemented in the core library of DolphinDB already. The built-in version (olsEx) provides more features.