C Pthreads – Problèmes d’implémentation de files d’attente sécurisées pour les threads

Je suis novice en multithreading et j’essaie d’implémenter une queue de tâches sécurisée pour les threads, où chaque thread peut tirer son travail jusqu’à ce qu’il ne rest plus de tâches. Aucune queue de tâches ne sera créée par aucun des threads.

À des fins de test, chaque tâche ne contient qu’un nombre.

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; typedef struct Task{ int number; }Task; typedef struct Cell{ Task t; struct Cell* next; }Cell; typedef struct TQueue{ struct Cell* head; struct Cell* tail; }TQueue; int empty(TQueue *Queue) return queue->head == queue->tail; void startQueue(TQueue *queue){ queue->head = malloc(sizeof(Cell)); queue->tail = queue->head; } void enqueue(TQueue *queue, Task C){ queue->tail->next = malloc(sizeof(Cell)); queue->tail = queue->tail->next; queue->tail->t = C; queue->tail->next = NULL; } Task * dequeue(TQueue* queue){ pthread_mutex_lock( &task_mutex); Task * t; if(empty(queue)) t = NULL; else{ struct Cell* p = queue->head; queue->head = queue->head->next; t = &queue->head->t; free(p); } pthread_mutex_unlock( &task_mutex); return t; } void * work( void* arg){ TQueue* queue = (TQueue *)arg; Task* t = malloc(sizeof(Task)); for(t = dequeue(queue); t != NULL; t = dequeue(queue)) printf("%d ", t->number); free(t); pthread_exit(NULL); return 0; } 

Pour un simple test, j’ai lu ceci sur main:

 int main(){ TQueue* queue = malloc(sizeof(TQueue)); startQueue(queue); pthread_t threads[3]; Task t[3]; for(int i = 0; i < 3; i++){ t[i].number = i + 1; enqueue(queue, t[i]); } for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue); for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL); return 0; } 

La sortie attendue était 1 2 3 dans n’importe quel ordre, mais parfois, une séquence contenant un nombre étrange, comme 1823219 2 3 . Je n’ai pas été en mesure de détecter des conditions de course ou des problèmes connexes, j’apprécie donc toute aide.

J’ai trouvé quelques bugs de plus.

J’ai annoté votre code. J’ai pris un peu de votre première publication et de votre deuxième. J’ai corrigé le code, montrant avant et après [veuillez excuser le nettoyage gratuit du style]:

 #include  #include  #include  static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; typedef struct Task { int number; } Task; typedef struct Cell { // NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets // messy #if 0 Task t; #else Task *t; #endif struct Cell *next; } Cell; typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; void startQueue(TQueue *queue) { #if 0 queue->head = malloc(sizeof(Cell)); #else queue->head = NULL; #endif queue->tail = NULL; } int empty(TQueue *queue) { // NOTE/BUG: dequeue never touches tail, so this test is incorrect #if 0 return (queue->head == queue->tail); #else return (queue->head == NULL); #endif } void enqueue(TQueue *queue, Task *t) { Cell *p; pthread_mutex_lock(&task_mutex); p = malloc(sizeof(Cell)); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } pthread_mutex_unlock(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; pthread_mutex_lock(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; // NOTE/BUG: this is setting t to the second element in the list, // not the first // NOTE/BUG: this is also undefined behavior, in original code (with // original struct definition), because what t points to _does_ get // freed before return #if 0 t = &queue->head->t; #else t = p->t; #endif free(p); } pthread_mutex_unlock(&task_mutex); return t; } void * work(void *arg) { TQueue *queue = (TQueue *) arg; // NOTE/BUG: this gets orphaned on the first call to dequeue #if 0 Task *t = malloc(sizeof(Task)); #else Task *t; #endif for (t = dequeue(queue); t != NULL; t = dequeue(queue)) printf("%d ", t->number); // NOTE/BUG: this frees some cell allocated in main -- not what we want #if 0 free(t); #endif pthread_exit(NULL); return 0; } // For a simple test i runned this on main: int main() { TQueue *queue = malloc(sizeof(TQueue)); startQueue(queue); pthread_t threads[3]; Task t[3]; for (int i = 0; i < 3; i++) { t[i].number = i + 1; #if 0 enqueue(queue, t); #else enqueue(queue, &t[i]); #endif } for (int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void *) queue); for (int i = 0; i < 3; i++) pthread_join(threads[i], NULL); return 0; } 

METTRE À JOUR:

Les threads exécutent-ils les tâches simultanément? J'ai testé l'utilisation de l'unité centrale avec htop et je ne peux utiliser qu'un seul kernel sur quatre au maximum.

Quelques points à garder à l'esprit. htop ne montrera probablement pas grand chose sur les programmes dont la durée d'exécution est aussi courte. Même avec 10 000 entrées de queue, ce programme s'exécute en 20 ms.

Il est préférable que le programme lui-même imprime les informations [voir ci-dessous]. Notez que printf verrouille les threads sur stdin ce qui peut consortingbuer à la nature "en série" du programme. Cela consortingbue également de manière significative au temps d'exécution du programme (c'est-à-dire que printf est beaucoup plus lent que la dequeue )

En outre, un seul thread (le premier) pourrait monopoliser la queue et drainer toutes les entrées avant que les autres puissent s'exécuter.

Le système d'exploitation peut [est libre de] planifier tous les threads sur un seul cœur. Il peut ensuite les "migrer" plus tard (par exemple en une seconde environ).

J'ai amélioré le programme pour inclure des informations de minutage dans l'impression de sortie qui pourraient aider à montrer davantage de ce que vous aimeriez voir. De plus, j'ai ajouté des options de ligne de commande pour contrôler le nombre de threads et le nombre d'éléments en queue. Ceci est similaire à ce que je fais pour certains de mes propres programmes. Transférez la sortie du programme dans un fichier journal et examinez-le. Jouez avec les options sur plusieurs pistes

 #include  #include  #include  #include  #include  int opt_n; // suppress thread output int opt_T; // number of threads int opt_Q; // number of queue items static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; double tvzero; typedef struct Task { int number; } Task; typedef struct Cell { Task *t; struct Cell *next; } Cell; typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; typedef struct Thread { pthread_t tid; int xid; TQueue *queue; } Thread; double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } void enqueue(TQueue *queue, Task *t) { Cell *p; pthread_mutex_lock(&task_mutex); p = malloc(sizeof(Cell)); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } pthread_mutex_unlock(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; pthread_mutex_lock(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; free(p); } pthread_mutex_unlock(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; double tvbef; double tvaft; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; if (! opt_n) printf("[%.9f/%.9f %5.5d] %d\n", tvbef,tvaft - tvbef,tskcur->xid,t->number); } return (void *) 0; } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'n': // suppress thread output opt_n = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; default: break; } } tvzero = tvgetf(); queue = malloc(sizeof(TQueue)); startQueue(queue); if (opt_T == 0) opt_T = 16; Thread threads[opt_T]; if (opt_Q == 0) opt_Q = 10000; t = malloc(sizeof(Task) * opt_Q); for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } printf("TOTAL: %.9f\n",tvgetf()); free(t); return 0; } 

MISE À JOUR # 2:

En outre, un thread (le premier) pourrait monopoliser la queue et drainer toutes les entrées avant que les autres aient une chance de s'exécuter. "Que peut-on faire dans ce cas?

Quelques choses.

pthread_create prend un peu de temps, permettant au thread 1 de continuer pendant que les autres sont en cours de création. Une façon d'améliorer ceci est de créer tous les threads, chaque thread positionne un drapeau "Je suis en cours d'exécution" (dans son bloc de contrôle de thread). Le thread principal attend que tous les threads définissent cet indicateur. Ensuite, le thread principal définit un indicateur global volatile "you_may_now_all_run" sur lequel chaque thread tourne avant d'entrer dans sa boucle de thread principale. D'après mon expérience, ils commencent tous à courir à des microsecondes les uns des autres [ou mieux].

Je n'ai pas implémenté ceci dans le code mis à jour ci-dessous, vous pouvez donc l'expérimenter vous-même [avec le nanosleep ].

les mutex sont globalement assez corrects [sous Linux, au moins] car un thread bloqué sera mis en queue, en attente du mutex. Comme je l'ai mentionné dans les commentaires, un nanosleep peut également être utilisé, mais cela [ nanosleep ] quelque peu le but recherché car les threads vont ralentir.

L'antidote à la famine est "l'équité". Comme je l'ai mentionné, il existe un algorithme élaboré pour l'équité sans attendre. Il s’agit de l’algorithme de Kogan / Petrank: http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf C’est vraiment un peu compliqué / avancé, c’est donc un caveat emptor ...

Cependant, un compromis peut être un verrou de ticket: https://en.wikipedia.org/wiki/Ticket_lock

J'ai retravaillé le programme à nouveau. Il propose des options pour l'allocation groupée, le locking ticket / mutex et l'impression différée des entrées du journal. Il vérifie également les résultats entre les threads pour s'assurer qu'aucun d'entre eux n'a des entrées en double.

Bien sûr, la clé de tout cela est une journalisation précise et de haute précision (c’est-à-dire que si vous ne pouvez pas le mesurer, vous ne pouvez pas l’accorder).

Par exemple, on pourrait penser que la mise en queue libre en queue serait plus lente que de simplement libérer la cellule dans un pool récupérable (similaire à un allocateur de dalle), mais l'amélioration des performances n'a pas été aussi grande que prévu. Cela pourrait être que malloc/free de glibc est juste flambante vite [qui est ce qu'ils prétendent ].

Ces différentes versions devraient vous donner quelques idées sur la manière de construire votre propre suite de mesure de performance.

Quoi qu'il en soit, voici le code:

 #include  #include  #include  #include  #include  #include  #include  #include  int opt_p; // print thread output immediately int opt_T; // number of threads int opt_Q; // number of queue items int opt_L; // use ticket lock int opt_M; // use fast cell alloc/free typedef unsigned char byte; typedef unsigned int u32; #define sysfault(_fmt...) \ do { \ fprintf(stderr,_fmt); \ exit(1); \ } while (0) // lock control typedef struct AnyLock { pthread_mutex_t mutex; // standard mutex volatile u32 seqreq; // ticket lock request volatile u32 seqacq; // ticket lock grant } AnyLock; // work value typedef struct Task { union { struct Task *next; int number; }; } Task; // queue item typedef struct Cell { struct Cell *next; Task *t; } Cell; // queue control typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; // thread log entry typedef struct Log { double tvbef; double tvaft; int number; } Log; #define BTVOFF(_off) \ ((_off) >> 3) #define BTVMSK(_off) \ (1u << ((_off) & 0x07)) #define BTVLEN(_len) \ ((_len) + 7) >> 3 // thread control typedef struct Thread { pthread_t tid; int xid; TQueue *queue; Log *log; byte *bitv; } Thread; static inline byte btvset(byte *bitv,long off) { u32 msk; byte oval; bitv += BTVOFF(off); msk = BTVMSK(off); oval = *bitv & msk; *bitv |= msk; return oval; } AnyLock task_mutex; AnyLock print_mutex; double tvzero; Cell *cellpool; // free pool of cells long bitvlen; #define BARRIER \ __asm__ __volatile__("" ::: "memory") // virtual function pointers Cell *(*cellnew)(void); void (*cellfree)(Cell *); void (*lock_acquire)(AnyLock *lock); void (*lock_release)(AnyLock *lock); double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void * xalloc(size_t cnt,size_t siz) { void *ptr; ptr = calloc(cnt,siz); if (ptr == NULL) sysfault("xalloc: calloc failure -- %s\n",strerror(errno)); return ptr; } void lock_wait_ticket(AnyLock *lock,u32 newval) { u32 oldval; // wait for our ticket to come up // NOTE: atomic_load is [probably] overkill here while (1) { #if 0 oldval = atomic_load(&lock->seqacq); #else oldval = lock->seqacq; #endif if (oldval == newval) break; } } void lock_acquire_ticket(AnyLock *lock) { u32 oldval; u32 newval; int ok; // acquire our ticket value // NOTE: just use a garbage value for oldval -- the exchange will // update it with the correct/latest value -- this saves a separate // refetch within the loop oldval = 0; while (1) { #if 0 BARRIER; oldval = lock->seqreq; #endif newval = oldval + 1; ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval); if (ok) break; } lock_wait_ticket(lock,newval); } void lock_release_ticket(AnyLock *lock) { // NOTE: atomic_fetch_add is [probably] overkill, but leave it for now #if 1 atomic_fetch_add(&lock->seqacq,1); #else lock->seqacq += 1; #endif } void lock_acquire_mutex(AnyLock *lock) { pthread_mutex_lock(&lock->mutex); } void lock_release_mutex(AnyLock *lock) { pthread_mutex_unlock(&lock->mutex); } void lock_init(AnyLock *lock) { switch (opt_L) { case 1: lock->seqreq = 0; lock->seqacq = 1; lock_acquire = lock_acquire_ticket; lock_release = lock_release_ticket; break; default: pthread_mutex_init(&lock->mutex,NULL); lock_acquire = lock_acquire_mutex; lock_release = lock_release_mutex; break; } } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } // cellnew_pool -- allocate a queue entry Cell * cellnew_pool(void) { int cnt; Cell *p; Cell *pool; while (1) { // try for quick allocation p = cellpool; // bug out if we got it if (p != NULL) { cellpool = p->next; break; } // go to the heap to replenish the pool cnt = 1000; p = xalloc(cnt,sizeof(Cell)); // link up the ensortinges pool = NULL; for (; cnt > 0; --cnt, ++p) { p->next = pool; pool = p; } // put this "online" cellpool = pool; } return p; } // cellfree_pool -- release a queue entry void cellfree_pool(Cell *p) { p->next = cellpool; cellpool = p; } // cellnew_std -- allocate a queue entry Cell * cellnew_std(void) { Cell *p; p = xalloc(1,sizeof(Cell)); return p; } // cellfree_std -- release a queue entry void cellfree_std(Cell *p) { free(p); } void enqueue(TQueue *queue, Task *t) { Cell *p; lock_acquire(&task_mutex); p = cellnew(); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } lock_release(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; lock_acquire(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; cellfree(p); } lock_release(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; Log *log; long cnt; int tprev; byte *bitv; double tvbeg; double tvbef; double tvaft; log = tskcur->log; bitv = tskcur->bitv; tvbeg = tvgetf(); tprev = 0; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; // abort if we get a double entry if (btvset(bitv,t->number)) sysfault("work: duplicate\n"); if (opt_p) { printf("[%.9f/%.9f %5.5d] %d [%d]\n", tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev); tprev = t->number; continue; } log->tvbef = tvbef; log->tvaft = tvaft; log->number = t->number; ++log; } if (! opt_p) { tvaft = tvgetf(); cnt = log - tskcur->log; log = tskcur->log; lock_acquire(&print_mutex); printf("\n"); printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ld\n", tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt); tprev = 0; for (; cnt > 0; --cnt, ++log) { printf("[%.9f/%.9f %5.5d] %d [%d]\n", log->tvbef,log->tvaft - log->tvbef,tskcur->xid, log->number,log->number - tprev); tprev = log->number; } lock_release(&print_mutex); } return (void *) 0; } void btvchk(Thread *tska,Thread *tskb) { byte *btva; byte *btvb; byte aval; byte bval; int idx; printf("btvchk: %d ??? %d\n",tska->xid,tskb->xid); btva = tska->bitv; btvb = tskb->bitv; // abort if we get overlapping ensortinges between two threads for (idx = 0; idx < bitvlen; ++idx) { aval = btva[idx]; bval = btvb[idx]; if (aval & bval) sysfault("btvchk: duplicate\n"); } } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'p': // print immediately opt_p = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; case 'L': opt_L = 1; break; case 'M': opt_M = 1; break; default: break; } } printf("p=%d -- thread log is %s\n",opt_p,opt_p ? "immediate" : "deferred"); if (opt_T == 0) opt_T = 16; printf("T=%d (number of threads)\n",opt_T); if (opt_Q == 0) opt_Q = 1000000; printf("Q=%d (number of items to enqueue)\n",opt_Q); printf("L=%d -- lock is %s\n",opt_L,opt_L ? "ticket" : "mutex"); printf("M=%d -- queue item allocation is %s\n", opt_M,opt_M ? "pooled" : "malloc/free"); tvzero = tvgetf(); lock_init(&task_mutex); lock_init(&print_mutex); // select queue item allocation strategy switch (opt_M) { case 1: cellnew = cellnew_pool; cellfree = cellfree_pool; break; default: cellnew = cellnew_std; cellfree = cellfree_std; break; } queue = xalloc(1,sizeof(TQueue)); startQueue(queue); Thread threads[opt_T]; // get byte length of bit vectors bitvlen = BTVLEN(opt_Q + 1); // allocate per-thread log buffers for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; if (! opt_p) tsk->log = xalloc(opt_Q,sizeof(Log)); tsk->bitv = xalloc(bitvlen,sizeof(byte)); } // allocate "work to do" t = xalloc(opt_Q,sizeof(Task)); // add to master queue for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } // fire up the threads for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { for (int j = i + 1; j < opt_T; j++) btvchk(&threads[i],&threads[j]); } printf("TOTAL: %.9f\n",tvgetf()); free(t); return 0; }