Part B - Languages

OpenMP - Map Reduce

Describe the two most common parallel patterns
Implement the map and reduce algorithms using OpenMP directives

Map | Reduce | Exercises


The two most common patterns in parallel programming are map and reduce.  The map pattern is often combined with another pattern for a compound solution.  Hadoop technology is based on Map-Reduce.  Some writers associate the success of Google's search engine with this compound pattern. 

This chapter describes algorithms that implement the map and reduce patterns.  In each case, the serial version of the code is presented and followed by its parallel implementation. 


The Map Pattern

Introduction

The Map pattern is the simplest parallel pattern.  It describes the same task executing on a set of different data elements.  The execution of each task is independent of the execution of any other task.  Since tasks are independent of one another, they can execute in parallel without further analysis.

Serial versions of map algorithms have growth rates of O(n).  Parallel versions reduce the elapsed times from O(n) to O(n/p) in direct proportion to the number of processors (p) available.



A Serial Version of the Map Pattern


A Parallel Version of the Map Pattern

Elemental Function

The task or set of instructions executed on each element of a collection is called the elemental function of the pattern.  The elemental function operates on each data element separately.  The data elements are completely unrelated.  Due to their independence the tasks do not communicate with one another.

The data that an elemental function accesses are classified into two groups

  • uniform - data that are the same across all instances of the elemental function
  • varying - data that may differ from one instance to another of the elemental function

The map pattern is deterministic as long as the solution does not depend on the order of execution of the elemental function.

Assumptions

The map pattern assumes that the elemental function

  • has no side effects
  • performs no random writes to memory

The elemental function may read data in memory other than its assigned data. 

Example - saxpy

The saxpy operation is the scaling of a vector by a constant factor, followed by the addition of a second vector to the scaled result and finally followed by the storing of the final result in the second vector.  This operation takes its name from the corresponding function in the BLAS (Basic Linear Algebra Subprograms) specification and is the conventional prototype for implementing the map pattern.  The s in the operation's name identifies a single precision operation.  The a denotes alpha - the constant factor.  Note that a is a uniform variable.  The x refers to the original input vector.  The p denotes the plus operation.  The y refers to the augmenting input vector.  Note that the elements of both vectors are varying.

The saxpy operation is defined by the following statement

 y[i] := a * x[i] + y[i]

This prototype exhibits a flow dependence, but this flow dependence is not loop-carried.

Serial Version

The serial version of saxpy is listed in the first function on the left:

 // Map Pattern
 // saxpy.cpp

 #include <iostream>

 void saxpy(float a, const float* x, float* y,
  int n) { 
     for (int i = 0; i < n; i++)
         y[i] = a * x[i] + y[i];
 }

  int main() {
     const int N = 10000;
     float x[N], y[N], a = 2.0f;

     // initialize
     float f = 1.0f / N * N;
     for (int i = 0; i < N; i++)
         x[i] = y[i] = f * (float)i;

     saxpy(a, x, y, N);

     // sum y[i]
     float s = 0.0f;
     for (int i = 0; i < N; i++)
         s += y[i];

     std::cout << "Sum y[i] = " << s << std::endl; 
 }



























 Sum y[i] = 7.49996e+009 

OpenMP Implementation Options

OpenMP implementation options for the map pattern include:

  • SPMD - the original implementation
  • Work Sharing - Multi-threading with optional scheduling of threads
  • SIMD - vectorization
  • Work Sharing with Vectorization - each thread may include execution vectorization

SPMD Implementation

The SPMD implementation distributes the work across a team of threads under programmer control.  We allocate the iterations to prescribed threads uniformly. 

 void saxpy(float a, const float* x, float* y, int n) { 
     #pragma omp parallel // start a parallel region
     {
         int tid = omp_get_thread_num();
         int nt = omp_get_num_threads();
         for (int i = tid; i < n; i += nt)
             y[i] = a * x[i] + y[i];
     } // end of the parallel region
 }

The control settings i = tid and i += nt in the for clause explicitly schedule the iterations to the different threads. 

Work-Sharing Implementation

The work-sharing implementation distributes the tasks across the team of threads according to a scheduling option.  We accept the default or specify the scheduling option ourselves.  The for construct implements work sharing. 

Work Sharing - 09 Part 1 Module 5

 void saxpy(float a, const float* x, float* y, int n) { 
     #pragma omp parallel // start a parallel region
     {
         #pragma omp for // accept the default scheduling option 
         for(int i = 0; i < n; i++)
             y[i] = a * x[i] + y[i];
     } // end of the parallel region
 }

We can combine the separate constructs into a single parallel for construct in the same directive.

The optional scheduling clause on a for construct specifies the partitioning of the n iterations into chunks.  Each chunk is executed on a single thread. 

For example, to schedule the iterations at compile time into chunks of 4 iterations per thread, we write:

 void saxpy(float a, const float* x, float* y, int n) { 
     #pragma omp parallel for schedule[static, 4]
     for(int i = 0; i < n; i++)
         y[i] = a * x[i] + y[i];
 }

OpenMP supports five scheduling strategies for work sharing:

  • schedule[static, chunk_size] - designed for iterations of more or less equal length - chunk_size defaults to n/p - chunks are assigned to threads in a round-robin fashion.

  • schedule[dynamic, chunk_size] - designed for iterations of variable length - chunk_size defaults to 1 - each thread executes a group and then requests the next group

  • schedule[guided, chunk_size] - variable length iterations with shrinking groups as unassigned iterations diminish - chunk_size defaults to 1

  • schedule[auto] - leaves the scheduling to the compiler or to the OpenMP runtime

  • schedule[runtime] - determined at runtime by an environment variable setting - useful for testing

OpenMP selects the schedule in priority order from specific to general:

  1. compiler directive clause schedule(static|dynamic|guided|auto[, chunk_size])
  2. runtime library call omp_set_schedule(omp_sched_t kind, int chunk_size) kind=omp_sched_static|omp_sched_dynamic|omp_sched_guided|omp_sched_auto
  3. enivironment variable OMP_SCHEDULE=static|dynamic|guided|auto[,chunk_size]

Chunking the collection into sub-collections of prescribed chunk size is also called tiling

OpenMP SIMD Implementation

The SIMD implementation distributes the tasks across vector registers.  The parallel simd construct implements SIMD processing. 

 void saxpy(float a, const float* x, float* y, int n) { 
     #pragma omp parallel simd
     for(int i = 0; i < n; i++)
         y[i] = a * x[i] + y[i];
 }

OpenMP Work-Sharing with SIMD Implementation

The work-sharing SIMD implementation distributes the tasks across a team of threads and across vector registers in each thread.  The parallel for simd construct implements this option.

 void saxpy(float a, const float* x, float* y, int n) { 
     #pragma omp parallel for simd
     for(int i = 0; i < n; i++)
         y[i] = a * x[i] + y[i];
 }

The Reduce Pattern

Introduction

The reduce pattern is the second most popular pattern in parallel programming.  This pattern applies a function to two elements of data to produce a single result.  Applied successively to an entire collection of elements, this pattern returns a single result for the collection.  The following formula expresses the combination operation:

 f(a, b) = a op b

f denotes the function that performs the operation op on data elements a and b

Reduction Operations

The reduction operations that OpenMP implements include (initialization constants are in parentheses):

  • + - addition - (0)
  • * - multiplication - (1)
  • - - subtraction - (0)
  • & - bit-wise and - (~0)
  • | - bit-wise or - (0)
  • ^ - bit-wise exclusive or - (0)
  • && - logical and - (1)
  • || - logical or - (0)
  • max - maximum - least representable number
  • min - minimum - most representable number

Serial Version

The symbolic representation for a serial algorithm of the reduce pattern is shown below:



A Serial Version of the Reduce Pattern

The algorithm applies the specified function to each element of the data set in order from first to last using the accumulated result for each succeeding operation.  The templated implementation of this version is

 template <class T>
 T reduce(
     T (*f)(const T&, const T&), // function that performs the operation
     T identity,                 // identity value for an empty set
     const T* a,                 // points to the data set
     int n                       // number of elements in the data set
     ) {

     T accum = identity;
     for (int i = 0; i < n; i++)
         accum = f(accum, a[i]));
     return accum;
 }

A code snippet that invokes this function is:

 int a[] = {1,2,3,4,5,6,7,8};                  // data set
 auto add = [](int a, int b) { return a + b; } // lambda expression
 int sum = reduce(add, T(0), a, 8);            // call to templated function

sum receives the final result of the reduction using the combination operation defined by the lambda expression operating on the data stored in a.

Assumptions

The reduce pattern assumes that

  • the data elements can be combined in any order (associative law holds)
  • an identity element exists - guarantees meaning if the number of data elements is zero

Since the arrangement of the data elements is open for any reduction algorithm, the reduction operator must be one that exhibits associativity.  The associativity property enables reordering of the data elements.  Operations on floating-point data are only approximately associative. 

A reordering of the data elements may improve reduction efficiency with vector processors.  Regrouping requires that the reduction operator exhibits not only associativity but also commutativity. 

The two mathematical properties that are important in implementing the reduce pattern are:

  • the operation's associativity - examples: + * && || ^ max min matmul quaternion multiply
  • the operation's commutativity - examples: + * && (yes) - / saturation (no)

Implementations

Common algorithms that the reduce pattern include tiling:

  • single-stage tiling using the SPMD programming model
  • multi-stage tiling with strides
  • work sharing

The tiling algorithms divide the data into chunks and operate on each chunk separately.  Each chunk is called a tile or a block

Tiling with SPMD

A single-stage tiling algorithm is illustrated in the figure below.  Each tile performs a serial reduction on its data chunk and generates the result for a final reduction. 



Reduce Using Single-Stage Tiling with SPMD

The tiling_with_SPMD_reduce function below implements this algorithm using a tile size of 4:

 template <typename T, typename C>
 T reduce(
     const T* in, // points to the data set
     int n,       // number of elements in the data set
     C combine,   // combine operation
     T initial    // initial value
     ) {

     for (int i = 0; i < n; i++)
         initial = combine(initial, in[i]);
     return initial;
 }

 template <typename T, typename C>
 T tiling_with_SPMD_reduce(
     C combine,   // function that performs the operation
     T identity,  // identity value for an empty set
     const T* in, // points to the data set
     int n        // number of elements in the data set
     ) {

     const int tile_size = 4;
     // calculate maximum number of tiles
     int mtiles = (n - 1) / tile_size + 1;
     // allocate sufficient space for maximum number of tiles
     T* partial = new T[mtiles];
     // request nTiles for the parallel region
     omp_set_num_threads(mtiles);

     #pragma omp parallel
     {
         int itile = omp_get_thread_num();
         int ntiles = omp_get_num_threads();
         if (itile == 0) mtiles = ntiles;
         int last_tile = ntiles - 1;
         int last_tile_size = n - last_tile * tile_size;
         partial[itile] = reduce(in + itile * tile_size,
          itile == last_tile ? last_tile_size : tile_size, combine, identity); 
     }

     T accum = identity;
     for (int itile = 0; itile < mtiles; itile++)
         accum = reduce(partial, mtiles, combine, accum);

     delete [] partial;
     return accum;
 }

A code snippet that invokes this function set is:

 int a[] = {1,2,3,4,5,6,7,8};                        // data set
 auto add = [](int a, int b) { return a + b; }       // lambda expression
 int sum = tiling_with_SPMD_reduce(add, T(0), a, 8); // call to templated function

In this snippet sum contains the result of the reduction defined by lambda expression add operating on the data stored in a.

Multi-Stage Tiling with Strides

A serial multi-stage tiling algorithm is illustrated in the figure below.  At each stage, the amount data to be processed is halved with respect to the previous stage.  Input data is no longer required once it has been processed, and each intermediate result can overwrite its corresponding input element, leaving the other element idle. 



Reduce Using Multi-Stage Tiling with Strides

The first stage on 8 data values produces the first set of 4 intermediate results.  The second stage reduces these intermediate results to 2 intermediate results.  The third stage reduces these intermediate results to the final result.  Synchronization is necessary between adjacent stages to ensure that the data for the next stage has been determined and stored. 

Note that synchronization of threads is expensive. 

The serial_reduce_with_strides function below implements a serial version of this algorithm size of 2:

 template <typename T, typename C>
 T serial_reduce_with_strides(
     C combine,   // function that performs the operation
     T identity,  // identity value for an empty set
     const T* in, // points to the data set
     int n        // number of elements in the data set
     ) {

     for (int stride = 1; stride < n; stride *= 2) {
         for (int i = 2 * stride - 1; i < n; i += 2 * stride)
             in[i] = combine(in[i - stride], in[i]);
     }

     return in[n - 1];
 }

This tree reduction algorithm provides better precision than the serial algorithm since intermediate results tend to be the same size if inputs are the same size.  We can improve precision by using larger precision accumulators.

A code snippet that invokes this function is:

 int a[] = {1,2,3,4,5,6,7,8};                           // data set
 auto add = [](int a, int b) { return a + b; }          // lambda expression
 int sum = serial_reduce_with_strides(add, T(0), a, 8); // call to templated function

In this snippet, sum contains the result of the reduction defined by lambda expression add operating on the data stored in a.

Parallel Work-Sharing

A parallel work-sharing implementation of the reduce pattern is straightforward and leaves any tiling to the compiler.  This option is implemented using the reduction clause. 

The templated version of this implementation for the addition operation is

 template <typename T>
 T reduce(
     const T* in, // points to the data set
     int n,       // number of elements in the data set
     T identity   // initial value
     ) {

     T accum = identity;
     #pragma omp parallel for reduction(+:accum)
     for (int i = 0; i < n; i++)
         accum += in[i];
     return accum;
 }

A code snippet that invokes this templated function is:

 int a[] = {1,2,3,4,5,6,7,8};                  // data set
 auto add = [](int a, int b) { return a + b; } // lambda expression
 int sum = reduce(add, T(0), a, 8);            // call to templated function

In this snippet, sum contains the result of the reduction defined by lambda expression add operating on the data stored in a.

Removing Loop-Carried Data Dependencies

Sources: Barlas, G. (2015), Jordan, H.F., Alaghband, G. (2003).

The three examples below show how to use the work sharing construct for with the reduction clause to manage loop-carried dependencies.  The source code containing the loop-carried dependency is listed on the left.  The work sharing solution using the reduction clause listed on the right.

  • A loop-carried flow dependency caused by a reduction variable.  The for construct partitions the loop and localizes the loop-carried dependency into separate dependencies within each thread.  Each thread executes its loop-carried flow dependency serially.  The team of threads executes in parallel and assembles the result subsequently.
     float sum = 0.0f;
     for (int i = 0; i < n; i++) 
         sum = sum + f(i);
             
     float sum = 0.0f;
     #pragma omp for reduction(+:sum) 
     for (int i = 0; i < n; i++)
         sum = sum + f(i);
  • A loop-carried output dependency where a unique value results from the iteration.  The for construct partitions the loop and localizes the loop-carried dependency into separate dependencies within each thread.  Each thread executes its loop-carried flow dependency serially.  The team of threads executes in parallel and assembles the result subsequently.
     w = x[0];
     for (int i = 1; i < n; i++)  
         if (w < x[i])
             w = x[i];
    
         
     w = x[0];
     #pragma omp for reduction(max:w) 
     for (int i = 1; i < n; i++)  
         if (w < x[i])
             w = x[i];
     }
  • Two loop-carried flow dependencies, where one is caused by consumption of a value produced in a previous iteration.  One of the dependencies can be parallelized, the other not.  The parallelizable dependency is separated from the non-parallelizable one.  This refactoring is called loop fission.
     w = y[0];
     for (int i = 1; i < n; i++) { 
         x[i] = x[i] + x[i - 1];
         w = w + y[i];
     }
         
     for (int i = 1; i < n; i++) 
         x[i] = x[i] + x[i - 1];
     w = y[0];
     #pragma omp parallel for reduction(+:w) 
     for (int i = 1; i < n; i++)
         w = w + y[i];

Exercises

  • Barlas, G. (2015). Multicore and GPU Programming - An Integrated Approach. Morgan-Kaufmann. 978-0-12-417137-4. pp.179-192.
  • Jordan, H. F., Alaghband, G. (2003). Fundamentals of Parallel Processing. Prentice-Hall. 978-0-13-901158-7. pp. 269-273.
  • Mattson, T. (2013). Tutorial Overview | Introduction to OpenMP




Previous Reading  Previous: OpenMP - Dependencies Next: OpenMP - Prefix Scan   Next Reading


  Designed by Chris Szalwinski   Copying From This Site   

Creative Commons License