Comment puis-je exécuter une fonction spécifique de thread de manière asynchrone dans c / c ++?

Réglage des performances: écriture de données sur plusieurs canaux

Maintenant je le fais en un seul fil:

for(unsigned int i = 0; i < myvector.size();) { tmp_pipe = myvector[i]; fSuccess = WriteFile( tmp_pipe, &Time, sizeof(double), &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } fSuccess = WriteFile( tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } fSuccess = WriteFile( tmp_pipe, pBuffer, BufferLen, &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } i++; } 

Et le résultat est que le premier pipe reçoit les données plus rapidement et le dernier pipe plus lentement.

Je pense le faire dans des threads séparés afin que chaque pipe soit traité de manière égale.

Mais comment puis-je exécuter une fonction spécifique de thread de manière asynchrone (le thread principal doit être immédiatement renvoyé) dans c / c ++?

Vous pouvez utiliser la fonction CreateThread pour créer un nouveau thread et transmettre le handle de canal en tant que paramètre à la fonction de thread:

 DWORD PipeThread(LPVOID param) { HANDLE hPipe = (HANDLE)param; // Do the WriteFile operations here return 0; } for(unsigned int i = 0; i < myvector.size(); i++) CreateThread(NULL, 0, PipeThread, myvector[i], 0, NULL); 

Notez que la classe de vecteurs n'est pas thread-safe, vous devrez donc faire face à des problèmes avec myvector.erase si vous ne synchronisez pas l'access à ces myvector.erase , par exemple. en utilisant une section critique.


Mise à jour: puisque vous avez mentionné la haute fréquence, vous pouvez utiliser des ports d’achèvement des E / S au lieu d’un thread distinct pour chaque canal. Vous pouvez ensuite utiliser des entrées / sorties WriteFile avec WriteFile pour effectuer l'écriture de manière asynchrone. Vous pouvez ne disposer que d'un seul thread supplémentaire qui écoute la fin des écritures:

 // Initial setup: add pipe handles to a completion port HANDLE hPort = CreateCompletionPort(myvector[0], NULL, 0, 1); for (unsigned int i = 1; i < myvector.size(); i++) CreateCompletionPort(myvector[i], hPort, 0, 0); // Start thread CreateThread(NULL, 0, PipeThread, hPort, 0, NULL); // Do this as many times as you want for(unsigned int i = 0; i < myvector.size(); i++) { OVERLAPPED *ov = new OVERLAPPED; ZeroMemory(ov, sizeof ov); WriteFile(myvector[i], buffer, size, NULL, ov); // If pipe handle was closed, WriteFile will fail immediately // Otherwise I/O is performed asynchronously } // Close the completion port at the end // This should automatically free the thread CloseHandle(hPort); --- DWRD PipeThread(LPVOID param) { HANDLE hPort = (HANDLE)param; DWORD nBytes; ULONG_PTR key; LPOVERLAPPED ov; // Continuously loop for I/O completion while (GetQueuedCompletionStatus(hPort, &nBytes, &key, &ov, INFINITE)) { if (ov != NULL) { delete ov; // Do anything else you may want to do here } } return 0; } 

Avez-vous writev() disponible? Si tel est le cas, vous pouvez réduire les trois opérations d’écriture à une par canal, ce qui serait plus efficace. Cela simplifie également un peu la gestion des erreurs, mais vous pouvez réduire ce que vous devez:

 for (unsigned int i = 0; i < myvector.size(); i++) { tmp_pipe = myvector[i]; if (!WriteFile(tmp_pipe, &Time, sizeof(double), &dwWritten, NULL) || !WriteFile(tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL) || !WriteFile(tmp_pipe, pBuffer, BufferLen, &dwWritten, NULL)) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); } } 

Cela est plus simple à lire à bien des égards, car il y a 1/3 du traitement des erreurs - le code opérationnel est donc moins caché.

Bien sûr, vous voudrez toujours envelopper ce code dans du code threadé pour que les opérations d'écriture soient gérées par des threads séparés. Vous organiseriez chaque fil pour obtenir son propre tuyau; ils partageraient un access en lecture seule à l'heure et à la mémoire tampon. Chaque thread écrivait et renvoyait un statut au fur et à mesure de son achèvement. Le thread parent attendrait chacun des threads enfants et si un thread enfant signalait l'échec, le canal client correspondant serait supprimé du vecteur. Etant donné que seul le thread parent manipulerait le vecteur, il n'y a pas de problèmes de thread à craindre.

Dans les grandes lignes:

  for (i = 0; i < myvector.size(); i++) tid[i] = thread create (myvector[i], write_to_pipe); for (i = 0; i < myvector.size(); i++) { status = wait for thread(tid[i]); if (status != success) myvector.erase(...); } 

Le tableau (ou vecteur) tid contient les identités de fil. La fonction write_to_pipe() est la fonction principale du thread; il écrit sur le tuyau qui lui est transmis et sort avec le statut approprié.

Est-ce que ces pipes sont nommés? Si tel est le cas, vous pouvez utiliser FILE_FLAG_OVERLAPPED lorsque vous les créez, ce qui vous permet d’effectuer des écritures asynchrones sans traiter les threads. Voici un exemple de MSDN .

S’il s’agit de canaux anonymes, les E / S superposées ne sont pas sockets en charge . Ce peut être une bonne raison de passer aux canaux nommés.

Une autre option pourrait consister à mettre un élément de travail en file d’ attente pour chaque écriture, mais cela ne garantit pas que les trois écritures seront exécutées simultanément.

J’envisagerais de préparer les données de manière à réduire le nombre d’appels d’E / S. Après avoir su que les données étaient en cours d’écriture de la manière la plus efficace possible avec le moins d’appels d’E / S possibles, j’envisagerais d’utiliser des E / S asynchrones. Si les performances ne sont toujours pas suffisantes, envisagez d’append des threads à la conception.

Une façon de réduire le nombre d’écritures consiste à utiliser une structure combinant toutes les données afin qu’une écriture puisse être utilisée au lieu de trois. Le pragma serait nécessaire pour se débarrasser de tout remplissage / alignement supplémentaire que le compilateur pourrait append.

 #pragma pack(push,1) struct PipeData { double _time; long _buffer_len; char* _buffer; }; #pragma pack(pop) PipeData data; int data_len = sizeof(double) + sizeof(long) + ; for(unsigned int i = 0; i < myvector.size();) { tmp_pipe = myvector[i]; fSuccess = WriteFile( tmp_pipe, &data, data_len, &dwWritten, NULL ); if(!fSuccess) { myvector.erase(myvector.begin()+i); printf("Client pipe closed\r\n"); continue; } i++; } 

Pour répondre spécifiquement à votre question “comment puis-je exécuter une fonction spécifique de thread de manière asynchrone (le thread principal devrait obtenir un retour immédiat) dans c / c ++?”

Vous pouvez le faire facilement en dissociant les deux actes. Créez un groupe de travail de threads, initialisé par le canal avec lequel il doit communiquer et écrivez une fonction qui planifiera un nouveau travail pour ces threads. Cela sera instantané par rapport à vos écritures de pipe et tous les threads auront son travail et commenceront à écrire sur les pipes qu’il contrôle alors que votre thread principal est libre de continuer son travail.

Ce sera une solution facile si le nombre de clients que vous avez est petit. Si ce n’est pas le cas, la création d’un thread pour chaque client ne sera pas très évolutive après un point et votre serveur sera agrandi avec beaucoup de commutateurs de contexte et de conflits de thread.

Dans ce scénario, pour une conception très serveur, vous devriez sérieusement réfléchir à la solution de Casablanca. Il ne crée qu’un seul thread pour écouter les notifications d’achèvement et constitue la conception de serveur la plus efficace à partir de Win 2003 pour créer des serveurs dans Windows.