Courses

Remarque sur les verrous

La semaine passée, nous avons utilisé des mutex pour protéger l'accès à une section critique. Il faut noter qu'il existe une version nommée recursive mutex . Un tel mutex permet à un thread ayant déjà acquis un mutex via sa primitive lock de réappeler la primitive lock sans être bloqué.

Par exemple, si vous essayez d'exécuter ce code :

#include <iostream>
#include <mutex>
 
std::mutex mut;
 
void function1(){
  mut.lock();
  std::cout << "je fais quelque chose" << std::endl;
  mut.unlock();
}
 
void function2(){
  mut.lock();
  std::cout << "je fais autre chose" << std::endl;
  mut.unlock();
}
 
void functionboth(){
  mut.lock();
  std::cout << "et je combine les deux choses" << std::endl;
  function1();
  function2();
  mut.unlock();
}
 
int main(){
  function1();
  function2();
  functionboth();
}

vous vous rendrez compte que le code est bloqué sur l'appel à function1 depuis la fonction functionboth. Ceci est du au fait que le verrou à déjà été acquis. Si vous remplacez le mutex par un recursive_mutex, les appels se dérouleront correctement puisque c'est le même thread qui fait tout les appels à lock.

Un autre danger est le cas où un thread ne libère par correctement un verrou. Ceci peut arriver dans un cas comme celui là :

#include <iostream>
#include <mutex>
 
std::mutex mut;
 
int resultat;
 
void division(int a, int b){
  mut.lock();
  if(b==0) throw std::string("division par 0");
  resultat = a/b;
  mut.unlock();
}
 
int main(){
  try{
    division(10,0);
    std::cout << resultat << std::endl;
  }
  catch(std::string const& err){
    std::cout << err << std::endl;
  }
 
  try{
    division(10,2);
    std::cout << resultat << std::endl;
  }
  catch(std::string const& err){
    std::cout << err << std::endl;
  }
}

Un solution est alors de penser à libérer le mutex lors de la levée de l'exception. Une solution plus simple est d'utiliser un lock_guard. Un lock_guard est un objet qui tente d'acquérir un verrou lors de sa construction et qui le libère lors de sa destruction. Il permet donc de protéger un bloc de code sans se soucier de la libération du verrou. Le fonction division devient alors :

void division(int a, int b){
  std::lock_guard<std::mutex> lock(mut);
  if(b==0) throw std::string("division par 0");
  resultat = a/b;
}

Opérations atomiques

Une opération atomique est un ensemble d'opérations pouvant être combinées afin qu'elles apparaissent comme une unique opération au système. Ces opérations ne retournent que deux types de résulats : succès ou échec.

Parmis les opérations atomiques, on peut citer le test-and-set et le fetch-and-add qui seront vus en cours ou encore le compare-and-swap que nous allons utiliser ici.

Le compare-and-swap ou compare-exchange en C++ compare le contenu d'une position en mémoire à une valeur donnée et, si ces deux dernières sont identiques, le contenu en mémoire est remplacé par une nouvelle valeur donnée. Ces opérations se font sans qu'un autre processus ne puisse agir sur le conteneur.

Un exemple d'utilisation pour un compteur concurrent est donné ici :

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
 
std::atomic<int> value;
 
void add(int val){
  // value = value + val; // Ne marche pas
  value += val;
}
 
void threadSafeAdd(int val){
  bool done = false;
  while(!done){
    int expected = value;
    int newval = expected+val;
    done = atomic_compare_exchange_weak(&value, &expected, newval);
  }
}
 
void adder(int val){
  for(int i=0; i<val; i++){
    add(1);
  }
}
 
void threadSafeAdder(int val){
  for(int i=0; i<val; i++){
    threadSafeAdd(1);
  }
}
 
int main(){
 
  value = 0;
  std::vector<std::thread> threads;
  for(int i=0; i<10; i++) threads.push_back(std::thread(adder, 10000));
  for(int i=0; i<10; i++) threads[i].join();
  std::cout << value << std::endl;
 
  threads.clear();
  value = 0;
  for(int i=0; i<10; i++) threads.push_back(std::thread(threadSafeAdder, 10000));
  for(int i=0; i<10; i++) threads[i].join();
  std::cout << value << std::endl;
 
}

Exercice

Ecrivez un programme qui cherche le nombre le plus grand et le plus petit dans un vecteur de double. Parallélisez ensuite ce code à l'aide de threads et utilisez l'opération compare-exchange pour protéger l'assignation du résultat.

Variables condition

Une variable condition est une structure permettant de mettre un ou plusieurs threads en attente jusqu'à ce qu'une condition soit réalisée. 

En résumé, une variable condition permet à un thread de se mettre en attente jusqu'à ce qu'un autre thread appelle notify_all (réveille tous les threads en attente) ou notify_one (réveille un thread en attente). Lors d'un appel à wait, il faut passer un unique_lock. Un unique_lock est comparable à un lock_guard, sauf qu'il est possible d'appeler les primitives lock et unlock sur le unique_lock, au contraire du lock_guard qui vérouille le mutex lors de la construction de l'objet et le libère lors de la destruction.

Lors de l'appel à wait, le mutex lié au verrou est libéré, et le mutex est réacquis lors du réveil du thread, il faut donc veiller à libérer le verrou après le réveil.

L'appel à wait doit également se faire dans une boucle qui teste la condition, cela à cause d'un effet appelé spurious wakeup. Pour faire simple, un thread peut être réveillé alors qu'aucun appel à notify n'a été effectué.

Il faut noter qu'en C++, l'attente :

while(!condition) cv.wait(lk);
 
// Peut être remplacé par :
 
cv.wait(lk, []{return condition;});

On présente également ici une proposition d'implémentation de barrière de synchronisation à l'aide d'une variable condition ainsi qu'un exemple d'utilisation :

#include <condition_variable>
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <random>
#include <chrono>
 
class Barrier{
 
private:
 
  std::mutex m;
  std::condition_variable cv;
  int generation;
  int waiting;
  int threshold;
 
public:
 
  Barrier(int n){
    generation = 0;
    waiting = 0;
    threshold = n;
  }
 
  Barrier(){
    generation = 0;
    waiting = 0;
    threshold = 0;
  }
 
  void init(int n){
    waiting = 0;
    threshold = n;
  }
 
  void wait(){
    auto myGeneration = generation;
    std::unique_lock<std::mutex> lk(m);
    waiting++;
    if(waiting == threshold){
      generation++;
      waiting = 0;
      cv.notify_all();
    }
    else{
      while(myGeneration == generation) cv.wait(lk);
    }
  }
 
};
 
Barrier barrier;
 
void func(){
 
  std::mt19937_64 eng{std::random_device{}()};
  std::uniform_int_distribution<> dist{500, 2000};
 
  for(int i=0; i<10; i++){
    std::cout << "doing some stuff" << std::endl;
    // attendre un temps aleatoire
    std::this_thread::sleep_for(std::chrono::milliseconds{dist(eng)});
    std::cout << "waiting on barrier" << std::endl;
    barrier.wait();
    std::cout << "released" << std::endl;
  }
 
}
 
int main(int argc, char** argv){
  const int numthreads = 5;
  barrier.init(numthreads);
  std::vector<std::thread> threads;
  for(int i=0; i<numthreads; i++) threads.push_back(std::thread(func));
  for(int i=0; i<numthreads; i++) threads[i].join();
}

Exercice - Le producteur-consommateur

Le problème du producteur-consommateur est un problème de synchronisation classique en informatique. Dans ce problème, deux entités, les producteurs et les consommateurs, partagent un tampon commun de taille fixe. La tâche des producteurs est de remplir le tampon avec des données, alors que la tâche des consommateurs est de vider le tampon et de traiter ces données. Le problème est que les producteurs ne doivent pas ajouter de données lorsque le tampon est plein et les consommateurs ne doivent pas accéder à un tampon vide.

Votre travail consiste à résoudre le problème du producteur-consommateur avec un ou plusieurs producteurs qui ajoutent simplement des entiers dans le tampon et un ou plusieurs consommateurs qui retirent ces entiers du tampon.

Vous devez implémenter deux méthodes (ou fonction) pour votre tampon : void put(int e) qui ajoute un élément dans le tampon et int take() qui retourne un élément du tampon. Vous devez utiliser des variables condition pour résoudre le problème.