Comment synchroniser les pthreads manager / worker sans jointure?

Je connais le multithreading et j’ai développé avec succès de nombreux programmes multithreads en Java et Objective-C. Mais je ne pouvais pas réaliser les choses suivantes en C en utilisant pthreads sans utiliser une jointure à partir du thread principal:

#include  #include  #include  #define NUM_OF_THREADS 2 struct thread_data { int start; int end; int *arr; }; void print(int *ints, int n); void *processArray(void *args); int main(int argc, const char * argv[]) { int numOfInts = 10; int *ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { ints[i] = i; } print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] pthread_t threads[NUM_OF_THREADS]; struct thread_data thread_data[NUM_OF_THREADS]; // these vars are used to calculate the index ranges for each thread int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; for (int i = 0; i arr; int start = data->start; int end = data->end; // 1. Wait for a signal to start from the main thread for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // 2. Signal to the main thread that you're done pthread_exit(NULL); } void print(int *ints, int n) { printf("["); for (int i = 0; i < n; i++) { printf("%d", ints[i]); if (i+1 != n) printf(", "); } printf("]\n"); } 

Je voudrais réaliser ce qui suit dans le code ci-dessus:

En main ():

  1. Signalez aux threads de commencer à travailler.
  2. Attendez que les threads d’arrière-plan soient terminés.

Dans processArray ():

  1. Attendez qu’un signal commence à partir du thread principal
  2. Signaler au fil principal que vous avez terminé

Je ne souhaite pas utiliser de jointure dans le fil principal, car dans l’application réelle , le fil principal créera les threads une fois, puis il indiquera aux threads d’arrière-plan de fonctionner plusieurs fois et je ne peux pas laisser le principal. thread continue sauf si tous les threads d’arrière-plan ont fini de fonctionner. Dans la fonction processArray , je vais mettre une boucle infinie comme suit:

 void *processArray(void *args) { struct thread_data *data = (struct thread_data *)args; while (1) { // 1. Wait for a signal to start from the main thread int *arr = data->arr; int start = data->start; int end = data->end; // Process for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // 2. Signal to the main thread that you're done } pthread_exit(NULL); } 

Notez que je suis nouveau en C et en l’API posix, alors excusez-moi s’il me manque quelque chose d’évident. Mais j’ai vraiment essayé beaucoup de choses, à commencer par utiliser un mutex et un tableau de sémaphores, et un mélange des deux, mais sans succès. Je pense qu’une variable de condition peut aider, mais je ne comprenais pas comment l’utiliser.

Merci pour votre temps.

Problème résolu:

Merci beaucoup les gars! J’ai finalement réussi à faire en sorte que cela fonctionne en toute sécurité et sans utiliser de jointure en suivant vos conseils. Bien que la solution soit un peu moche, elle fait le travail et les gains de performances en valent la peine (comme vous le verrez ci-dessous). Pour les personnes intéressées, il s’agit d’une simulation de l’application réelle sur laquelle je travaille, dans laquelle le fil principal continue de donner du travail aux fils d’arrière-plan:

  #include  #include  #include  #define NUM_OF_THREADS 5 struct thread_data { int id; int start; int end; int *arr; }; pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; int currentlyIdle; pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; int workReady; pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; int currentlyWorking; pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; int canFinish; void print(int *ints, int n); void *processArray(void *args); int validateResult(int *ints, int num, int start); int main(int argc, const char * argv[]) { int numOfInts = 10; int *ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { ints[i] = i; } // print(ints, numOfInts); pthread_t threads[NUM_OF_THREADS]; struct thread_data thread_data[NUM_OF_THREADS]; workReady = 0; canFinish = 0; currentlyIdle = 0; currentlyWorking = 0; // these vars are used to calculate the index ranges for each thread int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; // Create the threads and give each one its data struct. for (int i = 0; i < NUM_OF_THREADS; i++) { amountOfWork = remainingWork / (NUM_OF_THREADS - i); startRange = endRange + 1; endRange = startRange + amountOfWork - 1; thread_data[i].id = i; thread_data[i].arr = ints; thread_data[i].start = startRange; thread_data[i].end = endRange; pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]); remainingWork -= amountOfWork; } int loops = 1111111; int expectedStartingValue = ints[0] + loops; // used to validate the results // The elements in ints[] should be incremented by 1 in each loop while (loops-- != 0) { // Make sure all of them are ready pthread_mutex_lock(&currentlyIdleMutex); while (currentlyIdle != NUM_OF_THREADS) { pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex); } pthread_mutex_unlock(&currentlyIdleMutex); // All threads are now blocked; it's safe to not lock the mutex. // Prevent them from finishing before authorized. canFinish = 0; // Reset the number of currentlyWorking threads currentlyWorking = NUM_OF_THREADS; // Signal to the threads to start pthread_mutex_lock(&workReadyMutex); workReady = 1; pthread_cond_broadcast(&workReadyCond ); pthread_mutex_unlock(&workReadyMutex); // Wait for them to finish pthread_mutex_lock(&currentlyWorkingMutex); while (currentlyWorking != 0) { pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex); } pthread_mutex_unlock(&currentlyWorkingMutex); // The threads are now waiting for permission to finish // Prevent them from starting again workReady = 0; currentlyIdle = 0; // Allow them to finish pthread_mutex_lock(&canFinishMutex); canFinish = 1; pthread_cond_broadcast(&canFinishCond); pthread_mutex_unlock(&canFinishMutex); } // print(ints, numOfInts); if (validateResult(ints, numOfInts, expectedStartingValue)) { printf("Result correct.\n"); } else { printf("Result invalid.\n"); } // clean up for (int i = 0; i arr; int start = data->start; int end = data->end; while (1) { // Set yourself as idle and signal to the main thread, when all threads are idle main will start pthread_mutex_lock(&currentlyIdleMutex); currentlyIdle++; pthread_cond_signal(&currentlyIdleCond); pthread_mutex_unlock(&currentlyIdleMutex); // wait for work from main pthread_mutex_lock(&workReadyMutex); while (!workReady) { pthread_cond_wait(&workReadyCond , &workReadyMutex); } pthread_mutex_unlock(&workReadyMutex); // Do the work for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // mark yourself as finished and signal to main pthread_mutex_lock(&currentlyWorkingMutex); currentlyWorking--; pthread_cond_signal(&currentlyWorkingCond); pthread_mutex_unlock(&currentlyWorkingMutex); // Wait for permission to finish pthread_mutex_lock(&canFinishMutex); while (!canFinish) { pthread_cond_wait(&canFinishCond , &canFinishMutex); } pthread_mutex_unlock(&canFinishMutex); } pthread_exit(NULL); } int validateResult(int *ints, int n, int start) { int tmp = start; for (int i = 0; i < n; i++, tmp++) { if (ints[i] != tmp) { return 0; } } return 1; } void print(int *ints, int n) { printf("["); for (int i = 0; i < n; i++) { printf("%d", ints[i]); if (i+1 != n) printf(", "); } printf("]\n"); } 

Je ne sais pas si pthread_cancel est suffisant pour nettoyer! En ce qui concerne la barrière, cela aurait été d’une grande aide s’il ne s’était pas limité à certains systèmes d’exploitation mentionnés par @Jeremy .

Benchmarks:

Je voulais m’assurer que ces nombreuses conditions ne ralentissent pas l’algorithme. J’ai donc configuré ce benchmark pour comparer les deux solutions:

  #include  #include  #include  #include  #include  #include  #define NUM_OF_THREADS 5 struct thread_data { int start; int end; int *arr; }; pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; int currentlyIdle; pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; int workReady; pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; int currentlyWorking; pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; int canFinish; void *processArrayMutex(void *args); void *processArrayJoin(void *args); double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops); double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops); int main(int argc, const char * argv[]) { int numOfInts = 10; int *join_ints = malloc(numOfInts * sizeof(int)); int *mutex_ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { join_ints[i] = i; mutex_ints[i] = i; } pthread_t join_threads[NUM_OF_THREADS]; pthread_t mutex_threads[NUM_OF_THREADS]; struct thread_data join_thread_data[NUM_OF_THREADS]; struct thread_data mutex_thread_data[NUM_OF_THREADS]; workReady = 0; canFinish = 0; currentlyIdle = 0; currentlyWorking = 0; int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; for (int i = 0; i < NUM_OF_THREADS; i++) { amountOfWork = remainingWork / (NUM_OF_THREADS - i); startRange = endRange + 1; endRange = startRange + amountOfWork - 1; join_thread_data[i].arr = join_ints; join_thread_data[i].start = startRange; join_thread_data[i].end = endRange; mutex_thread_data[i].arr = mutex_ints; mutex_thread_data[i].start = startRange; mutex_thread_data[i].end = endRange; pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]); remainingWork -= amountOfWork; } int numOfBenchmarkTests = 100; int numberOfLoopsPerTest= 1000; double join_sum = 0.0, mutex_sum = 0.0; for (int i = 0; i  0.0) printf("Mutex is %.0f%% faster.\n", 100 * diff / join_avg); else if (diff < 0.0) printf("Join is %.0f%% faster.\n", 100 * diff / mutex_avg); else printf("Both have the same performance."); free(join_ints); free(mutex_ints); return 0; } // From https://stackoverflow.com/a/2349941/408286 double get_time() { struct timeval t; struct timezone tzp; gettimeofday(&t, &tzp); return t.tv_sec + t.tv_usec*1e-6; } double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops) { double start = get_time(); int loops = num_loops; while (loops-- != 0) { // Make sure all of them are ready pthread_mutex_lock(&currentlyIdleMutex); while (currentlyIdle != NUM_OF_THREADS) { pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex); } pthread_mutex_unlock(&currentlyIdleMutex); // All threads are now blocked; it's safe to not lock the mutex. // Prevent them from finishing before authorized. canFinish = 0; // Reset the number of currentlyWorking threads currentlyWorking = NUM_OF_THREADS; // Signal to the threads to start pthread_mutex_lock(&workReadyMutex); workReady = 1; pthread_cond_broadcast(&workReadyCond ); pthread_mutex_unlock(&workReadyMutex); // Wait for them to finish pthread_mutex_lock(&currentlyWorkingMutex); while (currentlyWorking != 0) { pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex); } pthread_mutex_unlock(&currentlyWorkingMutex); // The threads are now waiting for permission to finish // Prevent them from starting again workReady = 0; currentlyIdle = 0; // Allow them to finish pthread_mutex_lock(&canFinishMutex); canFinish = 1; pthread_cond_broadcast(&canFinishCond); pthread_mutex_unlock(&canFinishMutex); } return get_time() - start; } double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops) { double start = get_time(); int loops = num_loops; while (loops-- != 0) { // create them for (int i = 0; i < NUM_OF_THREADS; i++) { pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]); } // wait for (int i = 0; i arr; int start = data->start; int end = data->end; while (1) { // Set yourself as idle and signal to the main thread, when all threads are idle main will start pthread_mutex_lock(&currentlyIdleMutex); currentlyIdle++; pthread_cond_signal(&currentlyIdleCond); pthread_mutex_unlock(&currentlyIdleMutex); // wait for work from main pthread_mutex_lock(&workReadyMutex); while (!workReady) { pthread_cond_wait(&workReadyCond , &workReadyMutex); } pthread_mutex_unlock(&workReadyMutex); // Do the work for (int i = start; i arr; int start = data->start; int end = data->end; // Do the work for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } pthread_exit(NULL); } 

Et le résultat est:

 Join average : 0.153074 Mutex average: 0.071588 Mutex is 53% faster. 

Merci encore. J’apprécie vraiment votre aide!

Vous devez utiliser une technique de synchronisation différente de celle de la join , c’est clair.

Malheureusement, vous avez beaucoup d’options. L’une est une “barrière de synchronisation”, qui est fondamentalement une chose où chaque thread qui l’atteint bloque jusqu’à ce qu’ils l’aient tous atteint (vous spécifiez le nombre de threads à l’avance). Regardez pthread_barrier .

Une autre consiste à utiliser une paire condition-variable / mutex ( pthread_cond_* ). Quand chaque thread a fini, il prend le mutex, incrémente un compte et signale le condvar Le thread principal attend la variable condvar jusqu’à ce que le nombre atteigne la valeur attendue. Le code ressemble à ceci:

 // thread has finished mutex_lock ++global_count // optional optimization: only execute the next line when global_count >= N cond_signal mutex_unlock // main is waiting for N threads to finish mutex_lock while (global_count < N) { cond_wait } mutex_unlock 

Une autre consiste à utiliser un sémaphore par thread - lorsque le thread termine, il affiche son propre sémaphore et le thread principal attend tour à tour chaque sémaphore au lieu de les joindre tour à tour.

Vous avez également besoin d'une synchronisation pour redémarrer les threads pour le travail suivant. Il peut s'agir d'un deuxième object de synchronisation du même type que le premier, avec des détails modifiés pour le fait que vous avez 1 affiche et N serveurs plutôt que l'inverse. autour. Ou vous pourriez (avec soin) réutiliser le même object pour les deux buts.

Si vous avez essayé ces solutions et que votre code ne fonctionne pas, posez peut-être une nouvelle question spécifique sur le code que vous avez essayé. Tous sont adaptés à la tâche.

Vous pouvez utiliser plusieurs mécanismes de synchronisation (variables de condition, par exemple). Je pense que le plus simple serait d’utiliser un pthread_barrier pour synchroniser le début des threads.

En supposant que vous souhaitiez que tous les threads soient synchronisés à chaque itération de boucle, vous pouvez simplement réutiliser la barrière. Si vous avez besoin de quelque chose de plus flexible, une variable de condition peut être plus appropriée.

Lorsque vous décidez qu’il est temps que le thread se termine (vous n’avez pas indiqué comment les threads sauront sortir de la boucle infinie – une simple variable partagée peut être utilisée pour cela; la variable partagée peut être de type atomique ou protégée avec un mutex), le thread main() doit utiliser pthread_join() pour attendre que tous les threads soient terminés.

Vous travaillez au mauvais niveau d’abstraction. Ce problème a déjà été résolu. Vous réimplémentez une queue de travail + un pool de threads.

OpenMP semble être un bon choix pour votre problème. Il convertit les annotations #pragma en code threadé. Je pense que cela vous permettrait d’exprimer directement ce que vous essayez de faire.

Avec libdispatch , ce que vous essayez de faire serait exprimé comme un dispatch_apply ciblant une queue simultanée. Cela attend implicitement que toutes les tâches enfants soient terminées. Sous OS X, il est implémenté à l’aide d’une interface de file de travail pthread non portable. sous FreeBSD, je pense qu’il gère directement un groupe de pthreads.

S’il s’agit de problèmes de portabilité pour vous amener à utiliser des pthreads brutes, n’utilisez pas de barrières pthread. Les barrières constituent une extension supplémentaire par-dessus les threads POSIX de base. OS X par exemple ne le supporte pas. Pour plus, voir POSIX .

Le blocage du thread principal jusqu’à ce que tous les threads soient terminés peut être effectué à l’aide d’un décompte protégé par une variable de condition ou, plus simplement, à l’aide d’un tube et d’une lecture bloquante où le nombre d’octets à lire correspond au nombre de threads. Chaque thread écrit un octet à la fin du travail, puis dort jusqu’à ce qu’il ait un nouveau travail provenant du thread principal. Le thread principal se débloque une fois que chaque thread a écrit “J’ai terminé!” octet.

Le transfert de travail aux threads enfants peut être effectué à l’aide d’un mutex protégeant le descripteur de travail et d’une condition pour signaler un nouveau travail. Vous pouvez utiliser un seul tableau de descripteurs de travail à partir duquel tous les threads sont créés. Au signal, chacun tente de saisir le mutex. Lors de la saisie du mutex, il retirera du travail, signalera à nouveau si la file est non vide, puis traitera son travail, après quoi il signalera la fin du processus au thread principal.

Vous pouvez réutiliser cette “queue” pour débloquer le thread principal en mettant en queue les résultats. Le thread principal attend que la longueur de la queue corresponde au nombre de threads. L’approche par pipe utilise simplement une read bloquante pour faire ce compte pour vous.

Pour que tous les threads commencent à fonctionner, il suffit d’une variable entière globale initialisée à zéro. Les threads attendent simplement qu’elle soit différente de zéro. Ainsi, vous n’avez pas besoin de la boucle while (1) dans la fonction thread.

Pour attendre que tout soit terminé, pthread_join est le plus simple, car il bloquera jusqu’à ce que le thread auquel il est connecté soit terminé. Il est également nécessaire de nettoyer les éléments du système après le thread (sinon, la valeur de retour du thread sera stockée pour le rest du programme). Comme vous avez un tableau de tous les pthread_t pour les threads, passez-les en boucle un par un. Comme cette partie de votre programme ne fait rien d’autre et doit attendre que tous les threads soient terminés, il est normal de les attendre dans l’ordre.