Comment bien suspendre les discussions?

Dans le contexte d’une application multithread existante, je souhaite la modifier pour pouvoir suspendre les threads. L’application est composée de 3 threads de travail qui fonctionnent par “étapes de locking” en utilisant un pthread_barrier comme suit:

 Thread 1 Thread 2 Thread 3 while(1) while(1) while(1) | | | | | | | | | | | | barrier barrier barrier 

Tout va bien avec ce code. J’ajoute maintenant un 4ème thread utilisé pour contrôler les 3 autres, et j’ai besoin de suspendre / reprendre les 3 threads de travail. Pour l’instant, j’ai essayé d’utiliser un indicateur d’ stop global et une variable de condition écrite par le thread de contrôle et lue par le thread de travail après la barrière.

 Thread 1 Thread 2 Thread 3 Thread 4 while(1) while(1) while(1) wait for user input to suspend | | | mutex_lock | | | stop = 1 | | | mutex_unlock | | | wait for user input to resume | | | mutex_lock | | | stop = 0 | | | cond_broadcast() | | | mutex_unlock barrier barrier barrier mutex_lock mutex_lock mutex_lock if(stop) if(stop) if(stop) cond_wait() cond_wait() cond_wait() mutex_unlock mutex_unlock mutex_unlock 

Le problème de cette solution est qu’elle peut parfois se bloquer en fonction de la planification des threads et de la longueur de travail des threads 1, 2 et 3. Je me demande donc comment synchroniser avec succès ces 4 threads pour pouvoir suspendre / reprendre la les fils de travail de celui de contrôle?

Je pense que la réponse de gmch devrait résoudre la question initiale. Cependant, toutes les implémentations de pthread n’incluent pas pthread_barrier_t et les fonctions associées (car elles constituent une partie optionnelle des spécifications de threads POSIX). Voici donc l’implémentation de barrière personnalisée que j’ai mentionnée dans un commentaire sur la question d’origine.

(Notez qu’il existe d’autres moyens de suspendre / reprendre les threads de manière asynchrone, en fonctionnement normal et sans coopération des threads eux-mêmes. Un moyen d’implémentation consiste à utiliser un ou deux signaux en temps réel et un gestionnaire de signaux qui bloque dans sigsuspend() , en attente du signal complémentaire “continue”. Le thread de contrôle devra utiliser pthread_kill() ou pthread_sigqueue() pour envoyer les signaux en pause et en continu à chaque thread impliqué. Les threads sont affectés de manière minimale; mis à part d’ EINTR erreurs EINTR de blocage des appels système (car la transmission du signal interrompt le blocage), les threads ne progressent tout simplement pas – comme s’ils n’étaient pas planifiés depuis un certain temps. De ce fait, il ne devrait pas y avoir de problèmes concernant l’obtention des threads. interrompu et poursuivi à des moments légèrement différents Si vous êtes intéressé par cette méthode, laissez un commentaire et je pourrais essayer de vous montrer un exemple de mise en oeuvre de celle-ci également.)

Cela sera peut-être utile pour quelqu’un d’autre ayant besoin d’une implémentation de barrière personnalisée pouvant être suspendue, ou peut-être comme base de leur propre barrière personnalisée.

Modifié pour append le mode DRAINING , lorsque les threads doivent quitter. Dans votre boucle de travail, utilisez do { ... } while (!barrier_wait(&barrier));

barrière.h :

 #ifndef BARRIER_H #define BARRIER_H #include  #include  typedef enum { INVALID = -1, RUNNING = 0, PAUSED = 1, DRAINING = 2 } barrier_state; typedef struct { pthread_mutex_t mutex; pthread_cond_t cond; barrier_state state; int threads; /* Number of participants */ int waiting; /* Number of participants waiting */ } barrier; /** barrier_drain() - Mark barrier so that threads will know to exit * @b: pointer to barrier * @ids: pthread_t's for the threads to wait on, or NULL * @retvals: return values from the threads, or NULL * This function marks the barrier such that all threads arriving * at it will return ETIMEDOUT. * If @ids is specified, the threads will be joined. * Returns 0 if successful, errno error code otherwise. */ static int barrier_drain(barrier *const b, pthread_t *const ids, void **const retvals) { int result, threads; void *retval; if (!b || b->threads < 0) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno = result; b->state = DRAINING; pthread_cond_broadcast(&b->cond); threads = b->threads; b->threads = 0; pthread_mutex_unlock(&b->mutex); while (threads-->0) { result = pthread_join(ids[threads], &retval); if (result) return errno = result; if (retvals) retvals[threads] = retval; } return errno = 0; } /** barrier_pause() - Mark barrier to pause threads in the barrier * @b: pointer to barrier * This function marks the barrier such that all threads arriving * in it will wait in the barrier, until barrier_continue() is * called on it. If barrier_continue() is called before all threads * have arrived on the barrier, the barrier will operate normally; * ie the threads will continue only when all threads have arrived * at the barrier. * Returns 0 if successful, errno error code otherwise. */ static int barrier_pause(barrier *const b) { int result; if (!b || b->threads < 1) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno = result; if (b->state != PAUSED && b->state != RUNNING) { pthread_mutex_unlock(&b->mutex); return errno = EPERM; } b->state = PAUSED; pthread_mutex_unlock(&b->mutex); return errno = 0; } /** barrier_continue() - Unpause barrier * @b: Pointer to barrier * This function lets the barrier operate normally. * If all threads are already waiting in the barrier, * it lets them proceed immediately. Otherwise, the * threads will continue when all threads have arrived * at the barrier. * Returns 0 if success, errno error code otherwise. */ static int barrier_continue(barrier *const b) { int result; if (!b || b->threads < 0) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno = result; if (b->state != PAUSED) { pthread_mutex_unlock(&b->mutex); return errno = EPERM; } b->state = RUNNING; if (b->waiting >= b->threads) pthread_cond_broadcast(&b->cond); pthread_mutex_unlock(&b->mutex); return errno = 0; } /** barrier_wait() - Wait on the barrier * @b: Pointer to barrier * Each thread participating in the barrier * must call this function. * Callers will block (wait) in this function, * until all threads have arrived. * If the barrier is paused, the threads will * wait until barrier_continue() is called on * the barrier, otherwise they will continue * when the final thread arrives to the barrier. * Returns 0 if success, errno error code otherwise. * Returns ETIMEDOUT if the thread should exit. */ static int barrier_wait(barrier *const b) { int result; if (!b || b->threads < 0) return errno = EINVAL; result = pthread_mutex_lock(&b->mutex); if (result) return errno =result; if (b->state == INVALID) { pthread_mutex_unlock(&b->mutex); return errno = EPERM; } else if (b->state == DRAINING) { pthread_mutex_unlock(&b->mutex); return errno = ETIMEDOUT; } b->waiting++; if (b->state == RUNNING && b->waiting >= b->threads) pthread_cond_broadcast(&b->cond); else pthread_cond_wait(&b->cond, &b->mutex); b->waiting--; pthread_mutex_unlock(&b->mutex); return errno = 0; } /** barrier_destroy() - Destroy a previously initialized barrier * @b: Pointer to barrier * Returns zero if success, errno error code otherwise. */ static int barrier_destroy(barrier *const b) { int result; if (!b || b->threads < 0) return errno = EINVAL; b->state = INVALID; b->threads = -1; b->waiting = -1; result = pthread_cond_destroy(&b->cond); if (result) return errno = result; result = pthread_mutex_destroy(&b->mutex); if (result) return errno = result; return errno = 0; } /** barrier_init() - Initialize a barrier * @b: Pointer to barrier * @threads: Number of threads to participate in barrier * Returns 0 if success, errno error code otherwise. */ static int barrier_init(barrier *const b, const int threads) { int result; if (!b || threads < 1) return errno = EINVAL; result = pthread_mutex_init(&b->mutex, NULL); if (result) return errno = result; result = pthread_cond_init(&b->cond, NULL); if (result) return errno = result; b->state = RUNNING; b->threads = threads; b->waiting = 0; return errno = 0; } #endif /* BARRIER_H */ 

La logique est assez simple. Tous les threads en attente dans la barrière attendent la variable de condition conditionnelle. Si la barrière fonctionne normalement ( state==RUNNING ), le dernier thread arrivant à la barrière sera diffusé sur la variable de condition au lieu de l’attendre, réveillant ainsi tous les autres threads.

Si la barrière est en pause ( state==PAUSED ), même le dernier fil qui arrive à la barrière attendra la variable de condition.

Lorsque la fonction barrier_pause() est appelée, l’état de la barrière est changé en pause. Il peut y avoir zéro ou plusieurs threads en attente dans la variable de condition, ce qui est normal: seul le dernier thread arrivant à la barrière a un rôle spécial et ce thread ne peut pas encore être arrivé. (Si c’était le cas, la barrière aurait déjà été vidée.)

Lorsque la fonction barrier_continue() est appelée, l’état de la barrière devient normal ( state==RUNNING ). Si tous les threads attendent la variable de condition, ils sont libérés par diffusion sur la variable de condition. Sinon, le dernier thread arrivant à la barrière sera diffusé sur la variable de condition et libérera les threads en attente normalement.

Notez que barrier_pause() et barrier_continue() n’attendent pas que la barrière soit pleine ou se soit écasting. Il ne bloque que le mutex et les fonctions ne le conservent que pendant de très courtes périodes. (En d’autres termes, ils peuvent bloquer pendant une courte période, mais n’attendront pas que la barrière atteigne une situation spécifique.)

Si la barrière est en train de s’égoutter ( state==DRAINING ), les fils qui arrivent à la barrière reviennent immédiatement avec errno==ETIMEDOUT . Pour des raisons de simplicité, toutes les fonctions de barrière définissent maintenant inconditionnellement errno (0 si succès, 0 code errno si erreur, ETIMEDOUT si drainant).

Le mutex protège les champs de la barrière de sorte qu’un seul thread puisse y accéder à la fois. En particulier, un seul fil peut arriver à la barrière en même temps, en raison du mutex.

Une situation compliquée existe: le corps de boucle dans lequel la barrière est utilisée peut être si court, ou il peut y avoir tellement de threads, que les threads commencent à arriver à la prochaine itération de la barrière avant même que tous les threads de l’itération précédente l’aient quittée.

Selon POSIX.1-2004, pthread_cond_broadcast() “débloquera tous les threads bloquant actuellement la variable de condition spécifiée” . Même si leurs réveils seront séquentiels (chacun acquérant le mutex à son tour), seuls les threads qui ont été bloqués dessus lors de l’appel de pthread_cond_broadcast() seront réveillés.

Donc, si l’implémentation suit la sémantique POSIX en ce qui concerne les variables de condition, les threads réveillés peuvent (même immédiatement!) Attendre de nouveau sur la variable de condition, en attendant la prochaine diffusion ou le prochain signal: les “anciens” et les “nouveaux” serveurs sont des ensembles séparés. . Ce cas d’utilisation est en fait assez typique, et toutes les implémentations POSIX dont j’ai entendu parler le permettent – elles ne réveillent pas les threads qui ont commencé à attendre la variable de condition après la dernière opération pthread_cond_broadcast() .

Si nous pouvons nous appuyer sur la sémantique de réveil de la variable de condition POSIX, cela signifie que l’implémentation de la barrière ci-dessus doit fonctionner de manière fiable, y compris dans le cas où les threads arrivent à la barrière (pour la prochaine itération), avant même que tous les threads (de l’itération précédente) aient quitté. La barrière.

(Notez que le problème connu de “réveil parasite” ne concerne que pthread_cond_signal() ; c’est-à-dire que lorsque vous appelez pthread_cond_signal() plusieurs threads peuvent être réveillés. Ici, nous réveillons tous les threads avec pthread_cond_broadcast() . les serveurs, et pas les futurs serveurs.)


Voici une implémentation POSIX.1-2001 pour suspendre et reprendre les threads de manière asynchrone, sans aucune coopération du (des) thread (s) cible (s).

Cela utilise deux signaux, un pour suspendre un thread et un autre pour le reprendre. Pour une compatibilité maximale, je n’ai pas utilisé d’extensions GNU C ni de signaux en temps réel POSIX.1b. Les deux signaux enregistrent et restaurent errno , de sorte que l’impact sur les threads suspendus serait minime.

Notez cependant que les fonctions listées dans man 7 signalent , après le paragraphe “Les interfaces suivantes ne sont jamais redémarrées après avoir été interrompues par un gestionnaire de signaux , les fonctions ” Interruption des appels système et des fonctions de bibliothèque par les gestionnaires de signaux “ , renvoyera errno==EINTR lorsqu’il est suspendu / repris. Cela signifie que vous devrez utiliser le do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); traditionnel do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); do { result = FUNCTION(...); } while (result == -1 && errno == EINTR); boucle, au lieu de result = FUNCTION(...); .

Les suspend_threads() et resume_threads() ne sont pas synchrones. Les threads seront suspendus / repris soit avant, soit après le retour des appels de fonction. En outre, suspendre et reprendre les signaux envoyés de l’extérieur du processus lui même peut affecter les threads; cela dépend si le kernel utilise l’un des threads cibles pour délivrer de tels signaux. (Cette approche ne peut pas ignorer les signaux envoyés par d’autres processus.)

Les tests indiquent que, dans la pratique, cette fonctionnalité de suspension / reprise est assez fiable, en supposant qu’aucune interférence extérieure ne soit provoquée (en envoyant des signaux capturés par les threads cibles à partir d’un autre processus). Cependant, il n’est pas très robuste et il y a très peu de garanties sur son fonctionnement, mais cela pourrait suffire pour certaines implémentations.

suspend-resume.h :

 #ifndef SUSPEND_RESUME_H #define SUSPEND_RESUME_H #if !defined(_POSIX_C_SOURCE) && !defined(POSIX_SOURCE) #error This requires POSIX support (define _POSIX_C_SOURCE). #endif #include  #include  #include  #define SUSPEND_SIGNAL SIGUSR1 #define RESUME_SIGNAL SIGUSR2 /* Resume signal handler. */ static void resume_handler(int signum, siginfo_t *info, void *context) { /* The delivery of the resume signal is the key point. * The actual signal handler does nothing. */ return; } /* Suspend signal handler. */ static void suspend_handler(int signum, siginfo_t *info, void *context) { sigset_t resumeset; int saved_errno; if (!info || info->si_signo != SUSPEND_SIGNAL) return; /* Save errno to keep it unchanged in the interrupted thread. */ saved_errno = errno; /* Block until suspend or resume signal received. */ sigfillset(&resumeset); sigdelset(&resumeset, SUSPEND_SIGNAL); sigdelset(&resumeset, RESUME_SIGNAL); sigsuspend(&resumeset); /* Restore errno. */ errno = saved_errno; } /* Install signal handlers. */ static int init_suspend_resume(void) { struct sigaction act; sigemptyset(&act.sa_mask); sigaddset(&act.sa_mask, SUSPEND_SIGNAL); sigaddset(&act.sa_mask, RESUME_SIGNAL); act.sa_flags = SA_RESTART | SA_SIGINFO; act.sa_sigaction = resume_handler; if (sigaction(RESUME_SIGNAL, &act, NULL)) return errno; act.sa_sigaction = suspend_handler; if (sigaction(SUSPEND_SIGNAL, &act, NULL)) return errno; return 0; } /* Suspend one or more threads. */ static int suspend_threads(const pthread_t *const identifier, const int count) { int i, result, retval = 0; if (!identifier || count < 1) return errno = EINVAL; for (i = 0; i < count; i++) { result = pthread_kill(identifier[i], SUSPEND_SIGNAL); if (result && !retval) retval = result; } return errno = retval; } /* Resume one or more threads. */ static int resume_threads(const pthread_t *const identifier, const int count) { int i, result, retval = 0; if (!identifier || count < 1) return errno = EINVAL; for (i = 0; i < count; i++) { result = pthread_kill(identifier[i], RESUME_SIGNAL); if (result && !retval) retval = result; } return errno = retval; } #endif /* SUSPEND_RESUME_H */ 

Des questions?

Pour que les threads restnt synchronisés, vous devez placer le test stop avant la barrière. Si l’indicateur est défini pendant qu’un ou plusieurs des threads de travail ont atteint la barrière, ils sont maintenus jusqu’à ce que le ou les autres soient libérés de la condition.


Ajouté après l’échange de commentaires ci-dessous …

Avec le contrôle du drapeau d’arrêt après la barrière, il y a une course. Immédiatement après la barrière, les fils qui travaillent vérifient le drapeau à tour de rôle. Si l’indicateur est défini juste après qu’un ou plusieurs threads l’aient cochée, mais avant le (s) suivant (s), certains threads passeront à côté de l’arrêt et continueront jusqu’à la barrière. Ainsi, les threads actifs ne sont plus synchronisés .

Avec le contrôle du drapeau d’arrêt devant la barrière, il y a toujours une course, mais les threads qui travaillent ne sont pas désynchronisés. Si le drapeau est défini juste après qu’un ou plusieurs threads l’aient coché, ceux qui l’ont manqué continuent et s’arrêtent à la barrière. Tous les threads qui voient l’indicateur d’arrêt s’arrêteront à la condition, et lorsque la condition sera signalée, ils passeront à la barrière et tous les threads actifs continueront de se synchroniser.

En d’autres termes … avec le contrôle après la barrière, tous les fils de travail doivent voir le même état du drapeau d’arrêt après avoir été synchronisés avec la barrière, s’ils doivent restr synchronisés – ce qui est le cas. impossible. Avec la vérification avant la barrière, un seul thread actif doit voir l’indicateur d’arrêt afin de les arrêter en synchronisation – ce qui est simple.

D’après l’esquisse du code, il est difficile de savoir pourquoi il y aurait une impasse. Déplacer la vérification ne change pas cela, mais peut-être que le blocage signalé est dû au fait que les threads en cours de travail ne sont plus synchronisés.


Séparément et FWIW, généralement on écrirait:

 while (...reason to wait...) pthread_cond_wait(...) ; 

Plutôt que:

 if (...reason to wait...) pthread_cond_wait(...) ; 

Cela est principalement dû au fait que pthread_cond_signal() peut (le standard le permet) réactiver plusieurs threads et que, dans ce cas, pthread_cond_broadcast est utilisé … mais if sonne toujours l’alarme.

Vous pouvez utiliser le gestionnaire de signaux pour suspendre et reprendre un thread en fonction du signal qui est transmis au thread. Ecrivez deux gestionnaires de signaux personnalisés: un pour suspendre (SIGUSR1) et reprendre (SIGUSR2). Ainsi, lorsque vous souhaitez suspendre un thread, envoyez simplement le signal SIGUSR1 à ce thread. De même, pour reprendre un thread suspendu, envoyez SIGUSR2 à ce thread à l’aide de pthread_kill.