# Parallel Computation in C++

--

Often we consider the application we often think about performance, meaning how fast to machine performs certain operations in our program or how fast our application computes the final result.

As we know the computation time depends on many factors like the type of CPU, memory size, how we design applications (type of algorithms), etc.

Since the modern CPU consists of several cores (4, 8, 16, 32, etc) we can arrange our application to perform the same computation however this time running in parallel. The allocation of a certain part of our application can be run on separate cores (thread) affecting the reduction of computation time (boosting performance).

In this article, I will depict simple mechanisms which allow you to run your application in parallel.

The source code for all programs you will find on my **GitHub**.

I use Ubuntu 20.04, C++ 20 and g++ 11.0.

All the programs can be compiled as follows (in my case):

`g++-11 -std=c++2a atomic_thread.cpp -o atomic -lpthread -O3`

# Atomic operations

Atomic operations play a significant role when our application uses threads and threads access the same part of memory.

Threads are not synchronized and can access the same location of memory at the same time. For instance, we can imagine, that two threads want to update a certain part of memory (value += 1). It can happen that at the same time one thread will write the value but the other read it (both threads operate on the same memory). Such a situation creates a conflict called data race.

The atomic operation solves the problem, where (+=) — read/write operation or other (see the link) is considered as a **single operation**. The application reads and writes as one instruction.

The following example gives you an impression of how it works.

We run the application with two threads. Each thread updated the common memory value **M**. If the variable is not atomic so the final value can not be as expected (in our case 6000). The data race causes one thread reads while the other writes. If the value M is atomic the updated operation (+=) or other is considered as a single and the final results can be as expected 6000.

`int M; `

Eigen::Matrix<int, 3, 1> M_1 = Eigen::Matrix<int, 3, 1>::Ones();

auto work = []()

{

for (auto ii = 0; ii < 10; ii++) // 10-OK //100-NO

{

M += M_1.dot(M_1);

}

};

In this non-atomic example, results vary (in my case ) between 5341–5787.

`std::atomic<int> M; //only one diffrence`

Eigen::Matrix<int, 3, 1> M_1 = Eigen::Matrix<int, 3, 1>::Ones();

auto work = []()

{

for (auto ii = 0; ii < 10; ii++) // 10-OK //100-NO

{

M += M_1.dot(M_1);

}

};

In this atomic example, the result is always 6000.

# Dynamic Load Distribution

We discussed that we can split our application across threads and run our application ”in parallel”. Then we can assume that some of the threads can be heavily loaded (perform the complex operation) and some can finish allocated work very fast.

Dynamic load distribution can contribute to balancing the work across the application threads. We simply engage again the thread which finished the allocated work and take the work from the thread which is overloaded (we balance the workload across available threads).

In this case, when the work exists (the application has something to do) the thread instead of going to idle mode, takes on new work if it is ready. There is no particular order, regarding the number of a particular thread. Only the final result is required.

In order to benchmark our dynamic load distribution we check the performance of the static distribution where threads after completing their work are not “re-used”. The thread goes to idle mode, instead.

For static and dynamic load distribution our work is defined as follows,

` Eigen::Matrix<double, 100, 100> M1 = Eigen::Matrix<double, 100, 100>::Random();`

std::vector<std::vector<Eigen::Matrix<double, 100, 100>>> V;

std::vector<Eigen::Matrix<double, 100, 100>> vec0(2000, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec1(4, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec2(2, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec3(1, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec4(10000, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec5(3, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec6(2, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec7(10000, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec8(10000, Eigen::Matrix<double, 100, 100>::Random());

std::vector<Eigen::Matrix<double, 100, 100>> vec9(5, Eigen::Matrix<double, 100, 100>::Random());

V.push_back(vec0);

V.push_back(vec1);

V.push_back(vec2);

V.push_back(vec3);

V.push_back(vec4);

V.push_back(vec5);

V.push_back(vec6);

V.push_back(vec7);

V.push_back(vec8);

V.push_back(vec9);

auto work = [&](std::vector<Eigen::Matrix<double, 100, 100>> robotMat)

{

Eigen::Matrix<double, 100, 100> S = Eigen::Matrix<double, 100, 100>::Ones();

for (auto &ii : robotMat)

{

S *= ii;

}

};

There are 10 vectors with the type of Matrix (100x100). Each matrice of the vector is initiated by random numbers. The length of vectors varies from 1 to 10000. The work is connected with the multiplication of these matrices. As the length of these vectors is different the computation type is different.

In the static load distribution, the thread which finished the computation of the vector of length 4, 2, or 1** does not take the next work**. It simply goes idle mode.

When we measure the time computation so we receive results,

`time ./static`

real 0m3,493s

user 0m4,713s

sys 0m1,760s

The application needs 3.493s to finish the work.

Running **dynamic allocation** requires an atomic operation. The only difference between static and dynamic allocation is the mechanism that allows grabbing the remaining job by an idle thread (which finished previous work).

We are using the atomic index to keep track of the next work which will be performed by the thread. The idle thread grabs the new work using the atomic fetch_add() — take the current index and increment by 1. The for loop as you can see (see below) is exhausted when all work is done.

` std::atomic<int> index = 0;`

auto work = [&]()

{

Eigen::Matrix<double, 100, 100> S = Eigen::Matrix<double, 100, 100>::Ones();

for (int ii = index.fetch_add(1); ii < V.size(); ii = index.fetch_add(1))

{

for (auto &jj : V[ii])

{

S *= jj;

}

}

};

When we run the application performs the same work but the work is dynamically allocated so we can boost the performance (reduce the time).

`time ./dynamic`

real 0m2,741s

user 0m4,484s

sys 0m1,141s

This time the application is approx 0.7s faster!

# Double Buffering

While running applications we often use functions that generate data, for example, we capture sensor data and if the data is collected we use the other function to process this sensor data.

Similarly to our discussion about the atomic operation, we understand that we can not fill up the part of memory (here we call buffer) and at the same time process this data. However, there is a parallel mechanism that gives us the possibility to perform both operations (filling the buffer and draining) across the same iteration at the same time.

The mechanism is simple. We have to define two buffers (in my case A, B) in which access is protected by semaphores. Data is filled to buffer A. Processed data (read operation) is performed from buffer B. Captured data is swapped between buffer A and buffer B (swap operation swaps only the pointer to the memory; the data is not copied) so both threads can at the same time capture the data and process.

We can be depicted discussed mechanism in the following figure.

In our example, our capturing thread does nothing more than fill up the Marices with random values. The process thread at the same time multiplies the inverse with transpose Matrice (it is a dummy process, nothing special).

We can compare the performance of running the same task where only one buffer is engaged. As we can assume the buffer can be filled and drained asynchronously (one operation at a time).

While running double buffering we define two buffers. Synchronization across buffer A and buffer B is performed by binary semaphores and operations for “open” and “close” semaphores, release(), and acquire() operations respectively.

` std::binary_semaphore signalCapture{1}; //init to "1" => non-block`

std::binary_semaphore signalCompute{0}; //init to "0" => block

Eigen::Matrix<double, 100, 100> A;

Eigen::Matrix<double, 100, 100> B;

auto capture = [&]()

{

for (int ii = 0; ii < N; ii++)

{

captureData(A);

signalCapture.acquire(); // capturing Blocked. A is Blocked

A.swap(B); //B->A

signalCompute.release(); // computing Free. B

}

};

auto compute = [&]()

{

for (int ii = 0; ii < N; ii++)

{

signalCompute.acquire(); // computing Blocked. B is blocked

computeData(B); // Load B

signalCapture.release(); // capturing Free. A is free

}

};

The performance can be verified empirically while running the same work for applications utilizing double buffering can be verified running time. In this case, our program is approximately 1s faster.

Here is an example (see my GitHub),

`./non_buffer`

real 0m6,785s

user 0m6,780s

sys 0m0,004s

./buffer

real 0m6,785s

user 0m6,780s

sys 0m0,004s

# Blocking and non-blocking algorithms

During the parallel application, we can spawn a number of threads. Normally, threads protect access to a particular part of memory or process using a mutex. It can happen that some of the thread which grabs the lock can be canceled or killed (while holding the lock). It affects that the remaining threads can not access the lock (which is holed by a canceled thread) and the whole application can not be continued and finally crashed.

On the other hand (similarly to load distribution) we can imagine that the work (problem) is distributed among threads (all threads participate in finding the final solution) and some thread is slower than other (threads is overloaded). When the slower thread grabs the lock, it decreases the total computation time since the other (faster threads) can not perform the work (threads wait for the lock).

We use a non-blocking mechanism (from atomic) to avoid similar situations when the slower or canceled thread decreases the performance of the whole application. We are using the compare_exchange_strong() to implement the non-blocking mechanism, which enables continuing to run all other threads even if one or more threads are slower or canceled/killed.

The threads are totally independent of one another, despite they contributed to solving the same problem.

The blocking algorithm uses mutex. In order to simulate slower computation time by a thread, we use sleep_for method. As we can expect when the slower thread grabs the lock, the other threads are blocked (threads wait for the lock), and the performance of the whole application drops.

`std::mutex mtx;`

auto work_fast = []()

{

std::lock_guard<std::mutex> lock(mtx);

Eigen::Matrix<double, 10, 1> R = Eigen::Matrix<double, 10, 1>::Random();

M += R.dot(R);

std::this_thread::sleep_for(std::chrono::milliseconds(100));

};

auto work_slow = []()

{

std::lock_guard<std::mutex> lock(mtx);

Eigen::Matrix<double, 10, 1> R = Eigen::Matrix<double, 10, 1>::Random();

M += R.dot(R);

std::this_thread::sleep_for(std::chrono::milliseconds(1000));

};

In the non-blocking mechanism, we do not use mutex. Instead, we use the atomic compare_exchange_strong(), which compares the atomic variable with (in my case) the expected value. If both values are equal the atomic comparison return true and the atomic variable is replaced with (in my case) with desired.

`std::atomic<int> ai;`

exchanged= ai.compare_exchange_strong( expected, desired);

Output:

if ( ai == expected):

exchagned = true

ai = desired

Based on atomic we can arrange the thread's work as follows,

`std::atomic<double> M;`

auto work_fast = []()

{

double desired;

// https://cplusplus.com/reference/atomic/atomic/load/

// current value of M is loaded to expected from atomi

double expected = M.load();

Eigen::Matrix<double, 10, 1> R = Eigen::Matrix<double, 10, 1>::Random();

do

{

desired = expected + R.dot(R);

} while (!M.compare_exchange_strong(expected, desired));

std::this_thread::sleep_for(std::chrono::milliseconds(100));

};

auto work_slow = []()

{

double desired;

double expected = M.load();

Eigen::Matrix<double, 10, 1> R = Eigen::Matrix<double, 10, 1>::Random();

do

{

desired = expected + R.dot(R);

} while (!M.compare_exchange_strong(expected, desired));

std::this_thread::sleep_for(std::chrono::milliseconds(1000));

};

Now the magic happens. Running a non-blocking implementation of the given problem we can see that the slower thread does not block the other threads (threads perform work independently)!

`./blocking`

real 0m1,707s

user 0m0,001s

sys 0m0,007s

./non_blocking

real 0m1,006s

user 0m0,001s

sys 0m0,007s

If you would like to capture more information about parallelism and general software development, C++, and architecture I really recommend following **CoffeeBeforeArch** YouTube channel!

Thank you for reading.