## Make Quicksort fast by using threads and avoiding branch misprediction

This is not possible for every algorithm. But Quicksort for example is a Divide and Conquer algorithm. This type of algorithm is usually well suited for parallelization.

Avoiding branch misprediction is another way to speed up programs. The best technique for reducing branch misprediction is to eliminate the branches altogether.

``````for (int i = 0, j = 0; i < 1000; i++) {
if (numbers[i] < 500) {
small_numbers[j] = numbers[i];
j += 1;
}
}
``````

If the array contains random numbers between 0 and 1000, there is a 50% probability of branching misprediction.

This can be avoided by eliminating the branching.

``````for (int i = 0, j = 0; i < 1000; i++) {
small_numbers[j] = numbers[i];
j += (numbers[i] < 500);
}
``````

The execution times are hardware-dependent. They were measured on an Intel Celeron dual-core processor.

This is an iterative implementation with the fast Hoare’s partition scheme. We use the median of three fixed elements for the pivot element. This prevents the O(n²) worst case from occurring with random or sorted data, but not with specially prepared input data. In this case, a good random number generator should be used for the selection of the pivot element.

``````#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/time.h>

// Quicksort

#define LOWER(a, b) ((a) < (b))
#define TYPE int

#define swap(a, b) do { TYPE h = a; a = b; b = h; } while (0)

void insert_sort(TYPE* left, TYPE* right) {

// put minimum to left position, so we can save
// one inner loop comparsion for insert sort
for (TYPE* pi = left + 1; pi <= right; pi++) {
if (LOWER(*pi, *left)) {
swap(*pi, *left);
}
}
for (TYPE* pi = left + 2; pi <= right; pi++) {
TYPE h = *pi;
TYPE* pj = pi - 1;
while (LOWER(h, *pj)) {
*(pj + 1) = *pj;
pj -= 1;
}
*(pj + 1) = h;
}
}

#define sort3fast(a, b, c)              \
if (LOWER(b, a)) {                  \
if (LOWER(c, a)) {              \
if (LOWER(c, b)) swap(a, c);\
else {                      \
TYPE h = a; a = b;      \
b = c; c = h;           \
}                           \
}                               \
else swap(a, b);                \
}                                   \
else {                              \
if (LOWER(c, b)) {              \
if (LOWER(c, a)) {          \
TYPE h = c; c = b;      \
b = a; a = h;           \
}                           \
else swap(b, c);            \
}                               \
}                                   \

TYPE* partition(TYPE* left, TYPE* right) {

TYPE* left0 = left;
TYPE pivl = *left;
left += 1;

TYPE* mid = left0 + (right - left0) / 2;
TYPE piv = *mid;

TYPE pivr = *right;
sort3fast(pivl, piv, pivr);
*right = pivr;
*mid = *left;
*left0 = pivl;
*left = piv;

while (1) {
do left += 1; while (LOWER(*left, piv));
do right -= 1; while (LOWER(piv, *right));
if (left >= right) break;
swap(*left, *right);
}

*(left0 + 1) = *right;
*right = piv;
return right;
}

void sort(TYPE* data, int len) {

TYPE* stack[64]; // for max 2^32 data items
int sp = 0;
TYPE* left = data;
TYPE* right = data + len - 1;

while (1) {
// for arrays smaller than 50 use insert sort
if (right - left < 50) {
insert_sort(left, right);
if (sp == 0) break;
sp -= 2;
left = stack[sp];
right = stack[sp + 1];
}
else {
TYPE* mid = partition(left, right);
if (mid < left + (right - left) / 2) {
stack[sp] = mid + 1;
stack[sp + 1] = right;
right = mid - 1;
}
else {
stack[sp] = left;
stack[sp + 1] = mid - 1;
left = mid + 1;
}
sp += 2;
}
}
}

// ----------------------------------------------------------------------------------

double t(void) {
static double t0;
struct timeval tv;
gettimeofday(&tv, NULL);
double h = t0;
t0 = tv.tv_sec + tv.tv_usec / 1000000.0;
return t0 - h;
}

void init(TYPE* data, int len) {
for (int i = 0; i < len; i++) {
data[i] = rand();
}
}

void test(TYPE* data, int len) {
for (int i = 1; i < len; i++) {
if (data[i] < data[i - 1]) {
printf("ERROR\n");
break;
}
}
}

#define SIZE (50 * 1000000)
TYPE data[SIZE];

int main(void) {

init(data, SIZE);
printf("Sorting %d million numbers with Quicksort ...\n", SIZE / 1000000);
t();
sort(data, SIZE);
printf("%.2f s\n", t());
test(data, SIZE);
return 0;
}
``````
``````Sorting 50 million numbers ...
5.51s
``````

For comparison C++ std::sort:

``````.
.
#include <algorithm>

void sort(int* data, int len) {
std::sort(data, data + len);
}
.
.
``````
``````Sorting 50 million numbers with std::sort ...
5.98s
``````

### BlockQuicksort in C

The great paper https://arxiv.org/abs/1604.06697 by Edelkamp and A. Weiß shows how the performance of partitioning can be improved by avoiding conditional branches.

``````// add the missing functions from the examples above
.
.
#define BSZ 256
TYPE* partition(TYPE* left, TYPE* right) {

TYPE* left_piv = left + 1;
TYPE* p = left + (right - left) / 2;

TYPE pivl = *left;
TYPE piv = *p;
TYPE pivr = *right;
*p = *left_piv;

sort3fast(pivl, piv, pivr);

*right = pivr;
*left_piv = piv;
*left = pivl;
left += 1;

if (right - left > 2 * BSZ + 2) {
left += 1;
right -= 1;
TYPE* offl[BSZ];
TYPE* offr[BSZ];
TYPE** ol = offl;
TYPE** or = offr;
do {
if (ol == offl) {
TYPE* pd = left;
do {
*ol = pd;
ol += LOWER(piv, *pd);
pd += 1;
}
while (pd < left + BSZ);
}
if (or == offr) {
TYPE* pd = right;
do {
*or = pd;
or += LOWER(*pd, piv);
pd -= 1;
}
while (pd > right - BSZ);
}

int min = (ol - offl < or - offr) ? ol - offl : or - offr;

ol -= min;
or -= min;

for (int i = 0; i < min; i++) {
swap(**(ol + i), **(or + i));
}
if (ol == offl) left += BSZ;
if (or == offr) right -= BSZ;
}
while (right - left > 2 * BSZ);

left -= 1;
right += 1;
}

while (1) {
do left += 1; while (LOWER(*left, piv));
do right -= 1; while (LOWER(piv, *right));
if (left >= right) break;
swap(*left, *right);
}

*left_piv = *right;
*right = piv;
return right;
}
.
.
``````
``````Sorting 50 million numbers with BlockQuicksort ...
4.57s
``````

Now we also use the other processors for sorting. We start a new thread only if the array to be sorted is big enough and if there are not too many threads running.

``````// add the missing functions from the examples above
.
.
#include <unistd.h>

void* sort_thr(void *arg) {

TYPE** stack = (TYPE**)arg;
int sp = 0;
TYPE* left = stack[sp];
TYPE* right = stack[sp + 1];

while (1) {
if (right - left < 50) {
insert_sort(left, right);
if (sp == 0) break;
sp -= 2;
left = stack[sp];
right = stack[sp + 1];
}
else {
TYPE* l;
TYPE* r;
TYPE* mid = partition(left, right);
if (mid < left + (right - left) / 2) {
l = mid + 1;
r = right;
right = mid - 1;
}
else {
l = left;
r = mid - 1;
left = mid + 1;
}
TYPE** stack_new = malloc(64 * sizeof(TYPE*));
stack_new[0] = l;
stack_new[1] = r;
}
else {
stack[sp] = l;
stack[sp + 1] = r;
sp += 2;
}
}
}
free(stack);

return NULL;
}

void sort(TYPE* data, int len) {

int n_cpus = sysconf(_SC_NPROCESSORS_ONLN);
//  printf("%d\n", n_cpus);
if (n_cpus > 0) max_threads = n_cpus * 2;

TYPE** stack = malloc(64 * sizeof(TYPE*));
stack[0] = data;
stack[1] = data + len - 1;

}

// todo: check if 'malloc' and 'pthread_create' are successful
.
.
``````
``````Sorting 50 million numbers with threaded BlockQuicksort ...
2.42s
``````

Multithreading and the avoidance of branch mispredictions can significantly improve the application performance of programs.

### Appendix: How to prevent a DoS attack

If you know how Quicksort selects the pivot element, you can make the input so that Quicksort has a runtime of O(n²). This can be prevented by randomly selecting the pivot element.

``````
TYPE* partition(TYPE* left, TYPE* right) {

.
.
//  TYPE* mid = left0 + (right - left0) / 2;
TYPE* mid = left + rand() % (right - left);
.
.
}

#include <sys/random.h>

void sort(TYPE* data, int len) {

unsigned seed;
getrandom(&seed, sizeof(seed), 0);
srand(seed);
.
.
}
``````

easylang.online - easy programming online