Courses

Programmation sur architercture à mémoire partagée avec C++11

Dans cette série d'exercices, vous apprendrez à créer et exécuter des threads en C++11 afin de résoudre des problèmes de façon parallèle sur une architecture à mémoire distribuée.

 

Threads matériel supplémentaire: part 1, part 2, part 3, part 4, part 5

 

Vous verrez aussi des problématiques de programmation concurrente qui surviennent lorsque l'on travaille sur ce type d'architecture.

Considérations techniques

Nous utilisierons les fonctionnalités de programmation parallèle et concurrentes définies dans la librairie standard de C++11. Dès lors, il est nécessaire de travailler avec une version de g++ >= 4.8.1.

De plus, avec g++, le compilateur utilise les threads posix pour implémenter les fonctionalités de threading de C++11. Il est donc nécessaire de compiler les programmes avec l'option -pthread.

Une compilation ressemble donc à :

g++ monprog.cpp -o monprog -std=c++11 -pthread

Démarrer et stopper un thread

Un thread est un flux d'exécution léger. Plusieurs threads peuvent s'exécuter en concurence ou en parallèle sur une même machine. La différence principale avec un processus est que les threads d'un programme partagent le même espace mémoire, les threads peuvent donc échanger des données directement via des écritures et lectures mémoire. Ceci implique cependant d'être confronté aux problèmes de la programmation concurrente.

Le programme suivant montre un exemple qui créé et exécute deux threads (en plus du thread principal) :

#include <thread>
#include <iostream>
 
void hellothread(int p){
  std::thread::id this_id = std::this_thread::get_id();
  std::cout << "Hello, I'm a thread, my id is " << this_id << " and my parameter is " << p << std::endl;
}
 
int main(int argc, char** argv){
 
  std::thread t1(hellothread, 44);
  std::thread t2(hellothread, 55);
 
  std::cout << "Hello, I'm the main thread, my id is " << std::this_thread::get_id() << std::endl;
 
  t1.join();
  t2.join();
 
  return 0;
}

Lors de leur création, les threads t1 et t2 commencent tout de suite à exécuter la fonction hellothread. On voit que cette fonction prends un paramètre. Sa valeur est elle-même passée en paramètre lors de la création du thread. Il est possible d'utiliser un nombre arbitraire de paramètres. L'appel à thread.join() à pour effet de suspendre l'exécution du thread appelant jusqu'à ce que le thread appelé se termine. Ce type d'appel est donc dit potentiellement bloquant.

En c++, un thread est un objet à qui l'on passe un objet callable. Un callable peut être une fonction, une fonction objet ou encore une fonction lambda.

Les deux programmes suivants permettent d'illustrer la différence au niveau des espaces mémoires entre la programmation parallèle en mémoire partagée (threads) et en mémoire ditribuée (MPI). Exécutez les et assurez-vous d'avoir bien compris leur comportement.

#include <mpi.h>
#include <iostream>
#include <stdlib.h> 
 
int a;
 
int main(int argc, char **argv) {
  int myRank, nProc;
 
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
  MPI_Comm_size(MPI_COMM_WORLD, &nProc);
 
  a = 2;
  if(myRank == 0) a = 3;
 
  MPI_Barrier();
 
  std::cout << "Je suis le processeur " << myRank << " et la valeur de a est " << a << std::endl;   
 
  MPI_Finalize();
}
#include <thread>
#include <iostream>
#include <unistd.h>
 
int a = 2;
 
void hellothread(int p){
  sleep(p);
  std::thread::id this_id = std::this_thread::get_id();
  std::cout << "Hello, I'm a thread, my id is " << this_id << " and my parameter is " << p << " and the value of a is " << a <<std::endl;
}
 
int main(int argc, char** argv){
 
  std::thread t1(hellothread, 1);
  std::thread t2(hellothread, 2);
 
  a = 3;
 
  sleep(3);
  std::cout << "Hello, I'm the main thread, my id is " << std::this_thread::get_id() << " and the value of a is " << a <<std::endl;
 
  t1.join();
  t2.join();
 
  return 0;
}

Accès concurrent à des variables partagées

Maintenant que l'on sait créer des threads, on peut imaginer utiliser des threads pour résoudre un problème en parallèle. Prenons le programme suivant qui calcule la somme d'un vecteur :

#include <iostream>
#include <vector>
 
std::vector<double> vec(1e8,3);
double sum;
 
int main(){
 
  double sum = 0.0;
 
  for(int j=0; j<vec.size(); j++) {
    sum += vec[j];
  }
 
  std::cout << sum << std::endl;
 
  return 0;
}

Et parallélisons le naïvement avec des threads (le nombre de threads est passé en paramètre du programme) :

#include <iostream>
#include <vector>
#include <thread>
 
double sum;
std::vector<double> vec(1e8,3);
 
void sumvec(int low, int up){
  for(int j=low; j<up; j++) {
    sum += vec[j];
  }
}
 
int main(int argc, char** argv){
 
  int nt = atoi(argv[1]);
  sum = 0.0;
  std::vector<std::thread> threads;
 
  for(int i = 1; i<nt; i++){
    threads.push_back(std::thread(sumvec, i*vec.size()/nt, i*vec.size()/nt+vec.size()/nt));
  }
 
  sumvec(0, vec.size()/nt);
 
  for(auto& t : threads) t.join();
 
  std::cout << sum << std::endl;
 
  return 0;
}

Si on exécute ce code, on est confronté à un problème bien connu de la programmation concurrente : l'écriture concurrente dans une variable partagée (ici sum). Le problème vient du fait que l'opération sum += vec[j] n'est pas atomique. Plus spécifiquement, ce genre de séquence d'événements peut se produire :

t1 copie la valeur de sum dans un registre               sum = 0, r1 = 0, r2 = 0
t2 copie la valeur de sum dans un registre               sum = 0, r1 = 0, r2 = 0
t1 additionne une valeur dans son registre               sum = 0, r1 = 1, r2 = 0
t2 additionne une valeur dans son registre               sum = 0, r1 = 1, r2 = 1
t1 copie la valeur de son registre dans sum              sum = 1, r1 = 1, r2 = 1
t2 copie la valeur de son registre dans sum              sum = 1, r1 = 1, r2 = 1

On constate alors à la fin de cette séquence d'instructions que la valeur stockée dans “sum” est 1 alors que l'on s'attendrait à trouver 2.

Atomicité et exclusion mutuelle

Comme on l'a dit, le problème vient du fait que l'opération d'addition sur “sum” n'est pas atomique. Afin de rendre l'addition indivisible, nous allons utiliser un mutex, ou sémaphore d'exclusion mutuelle. Un mutex est un verrou permettant de rendre une portion de code exécutable par un et un seul thread à un moment donné. On appelle alors cette portion de code la section critique. Notre code précédent serait alors transformé comme suit :

#include <mutex>
 
std::mutex mut;
double sum;
 
void sumvec(int low, int up){
  for(int j=low; j<up; j++) {
    mut.lock();
    sum += vec[j];
    mut.unlock();
  }
}

Maintenant que notre code donne le résultat attendu, regardons si nous avons diminué le temps d'exécution grâce à la parallélisation. Pour cela, on peut utiliser le code suivant :

#include <chrono>
 
int main(){
  auto start = std::chrono::steady_clock::now();
 
  // do stuff
 
  auto end = std::chrono::steady_clock::now();
  auto diff = end - start;
  std::cout << std::chrono::duration <double, std::milli> (diff).count() << " ms" << std::endl;
 
  return 0;
}

Sur PC69240, on obtient les temps d'exécution suivants (avec quatre threads pour les versions parallèles) :

Type de somme Temps
Somme séquentielle ~1  [s]
Somme parallèle avec sémaphore d'exclusion mutuelle ~45[s]

 

 

 

Ce qui n'est pas réellement un succès.

Ces mauvaises performances peuvent s'expliquer par le fait que la version avec mutex implique la sérialisation de la portion de code à l'intérieur de la section critique. En plus, l'utilisation de verrou est un mécanisme lourd.

La solution est donc de minimiser au maximum l'accès aux variables partagées et la portion du code se trouvant en section critique en modifiant légérement la solution comme suit :

void sumvec(int low, int up){
  double sumlocal = 0.0;
  for(int i=low; i<up; i++) {
    sumlocal += vec[i];
  }
  mut.lock();
  sum += sumlocal;
  mut.unlock();
}

Cette fois-ci, les temps d'exécution deviennent les suivants :

Type de somme Temps
Somme séquentielle ~1     [s]
Somme parallèle avec sémaphore d'exclusion mutuelle ~0.28[s]

 

 

 

Qui est bien un speedup proche du nombre de coeur utilisé.

Future

Un future est une structure de données permettant la récupération asynchrone d'une donnée, soit par le biais d'une fonction à exécuter, soit par le biais d'un autre flux d'exécution définissant cette donnée. En c++, on peut crééer un future de trois façons différentes : via un promise qui permet à un flux d'exécution de donner une valeur à un autre flux bloqué en attente de cette valeur, via un packaged task qui permet de récupérer la valeur retournée par une fonction exécutée de façon asynchrone, ou via un async qui fait la même chose mais avec une interface de plus haut niveau. Des exemples sont donnés ici: 

http://www.cplusplus.com/reference/future/promise/get_future/

http://www.cplusplus.com/reference/future/packaged_task/get_future/

http://www.cplusplus.com/reference/future/async/ .

Nous allon réécrire la somme en parallèle à l'aide d'un future, tout en évitant les effets de bord et donc tout problème d'écriture concurrente. Cela est possible à l'aide d'un future, au contraire d'un thread qui ne peut produire un résultat qu'à l'aide d'effet de bord. En d'autres termes, un thread doit écrire un résultat dans une variable partagée tandis qu'un future retourne un résultat.

Cela se traduit par le code suivant :

#include <iostream>
#include <vector>
#include <future>
 
double sumvec( double* v, int low, int up){
  double sumlocal = 0.0;
  for(int i=low; i<up; i++) {
    sumlocal += v[i];
  }
  return sumlocal;
}
 
int main(int argc, char** argv){
  int nf = atoi(argv[1]);
  std::vector<double> vec(1e8,3);
  double sum=0.0;
 
  std::vector<std::future<double>> futures;
  for(int i=0; i<nf; i++){
    futures.push_back(std::async(std::launch::async, sumvec, vec.data(), i*vec.size()/nf, i*vec.size()/nf + vec.size()/nf));
  }
 
  for(auto& f : futures){
    sum += f.get();
  }
 
  std::cout << sum << std::endl;
 
  return 0;
}

Qui donne également un temps d'exécution d'environ 280[ms].

Vous remarquez que le premier paramètre passé lors de la construction du future est std::launch::async. Ceci correspond à la règle d'exécution du future ou policy. Cette policy peut être async comme ici, l'exécution de la fonction liée au future commence alors lors de la construction de celui-ci, ou deferred. Le calcul ne commence alors que lors de l'appel à get().

Exercice

Ecrivez un programme qui cherche le nombre le plus grand et le plus petit dans un vecteur de nombres entiers. Parallélisez ensuite ce code à l'aide de threads. Vous écrirez une version qui utilise un verrou de type mutex et une version utilisant des future. Ecrivez ces codes en gardant la performance comme objectif. Faites ensuite la comparaison des temps d'exécution des différents codes, d'abord sans activer les optimisations du compilateur, puis en les activant (en ajoutant l'option -O3 à la ligne de compilation).