OVH Cloud OVH Cloud

thread pool avec boost

5 réponses
Avatar
Jean-Marc Desperrier
Bonjour,

J'aimerais bien implémenter un "thread pool" en me basant sur boost, je
me demande s'il y a des solutions à recommander.

En cherchant un peu, j'ai trouvé ceci :
http://threadpool.sourceforge.net/tutorial/intro.html
Ca semble pas bien correspondre au cahier des charges au premier abord,
mais souvent les limitation n'apparaîssent que quand on a déjà pas mal
joué avec quelquechose.

5 réponses

Avatar
kanze
Jean-Marc Desperrier wrote:

J'aimerais bien implémenter un "thread pool" en me basant sur
boost, je me demande s'il y a des solutions à recommander.


Pourquoi ? C'est à résoudre quel problème ?

Il n'y a rien de particulier en ce qui concerne les « thread
pools ». Tout dépend de ce qu'on veut faire avec.

En cherchant un peu, j'ai trouvé ceci :
http://threadpool.sourceforge.net/tutorial/intro.html Ca
semble pas bien correspondre au cahier des charges au premier
abord, mais souvent les limitation n'apparaîssent que quand on
a déjà pas mal joué avec quelquechose.


Je n'arrive pas à comprendre même ce qu'il fait. Il manque des
informations dans l'exemple pour savoir ce qui ce fait
réelement.

Si le but c'est simplement de limiter le nombre de threads
« worker », où de pouvoir activer un traitement dans un autre
thread plus rapidement, la solution la plus simple, c'est de
créer une queue de messages, et de démarrer tous les threads au
départ. Chaque thread boucle sur l'attente d'une requête dans
la queue, et à la place de démarrer un thread « worker », tu
postes un message dans la queue.

Par thread « worker », j'entends un thread qui est lancé pour
effectuer un traitement particulier, et qui se termine à la fin
de ce traitement. C'est une solution simple et répandue, mais
qui peut mener à un nombre de threads énormes si pour une raison
quelconque, les requêtes arrivent plus vite que les threads
puissent les traiter. Aussi, créer un thread est souvent une
opération assez coûteuse -- utiliser un thread déjà créé peut
être plus performant. (Note bien qu'avec Boost, la création d'un
thread suppose aussi la création d'un mutex et d'une condition,
afin de synchronizer le démarrage.)

Si le but principal est le gain en performance, on pourrait
aussi ne lancer un thread que s'il n'y a aucun thread en attente
d'une requête quand on le créer.

Dans la contexte de Boost : tu crées une queue de messages à
partir de boost::mutex et de boost::condition. Tu en passes
l'adresse à chaque thread que tu crées ; le thread boucle sur
une attente à la queue. Tu passes l'objet fonctionnel par la
queue de messages. Ça doit faire moins de cinquante lignes de
code tout cassé.

--
James Kanze GABI Software
Conseils en informatique orientée objet/
Beratung in objektorientierter Datenverarbeitung
9 place Sémard, 78210 St.-Cyr-l'École, France, +33 (0)1 30 23 00 34

Avatar
Jean-Marc Desperrier
kanze wrote:
Jean-Marc Desperrier wrote:
J'aimerais bien implémenter un "thread pool" en me basant sur
boost, je me demande s'il y a des solutions à recommander.


Pourquoi ? C'est à résoudre quel problème ?

Il n'y a rien de particulier en ce qui concerne les « thread
pools ». Tout dépend de ce qu'on veut faire avec.


Je trouve que c'est un peu dommage et que le besoin est réellement
répétitif. J'ai en tête une idée d'implémentation en reprenant
pratiquement preque à l'identique l'interface de thread_group.

Par thread « worker », j'entends un thread qui est lancé pour
effectuer un traitement particulier, et qui se termine à la fin
de ce traitement.


Il s'agit exactement d'optimiser ce cas là. Plutôt que de détruire à
chaque fois le thread, le replacer dans une liste de thread non-actif,
et de manière transparente reprendre l'un de ces thread pour gérer la
prochaine requête. Mais on se posera toujours un peu aussi la question
de savoir comment limiter le nombre de thread.

Dans la contexte de Boost : tu crées une queue de messages à
partir de boost::mutex et de boost::condition. Tu en passes
l'adresse à chaque thread que tu crées ; le thread boucle sur
une attente à la queue. Tu passes l'objet fonctionnel par la
queue de messages. Ça doit faire moins de cinquante lignes de
code tout cassé.


Certe, mais 50 lignes un peu délicate.


Avatar
kanze
Jean-Marc Desperrier wrote:
kanze wrote:
Jean-Marc Desperrier wrote:
J'aimerais bien implémenter un "thread pool" en me basant sur
boost, je me demande s'il y a des solutions à recommander.


Pourquoi ? C'est à résoudre quel problème ?

Il n'y a rien de particulier en ce qui concerne les « thread
pools ». Tout dépend de ce qu'on veut faire avec.


Je trouve que c'est un peu dommage et que le besoin est
réellement répétitif. J'ai en tête une idée d'implémentation
en reprenant pratiquement preque à l'identique l'interface de
thread_group.


Donne-leur un peu de temps. Ce qu'offre boost::thread
actuellement est assez primitif ; les outils pour des
constructions plus évoluées. Ils reconnaissent bien qu'il manque
encore des outils de couche au-dessus, comme un queue de
messages.

Par thread « worker », j'entends un thread qui est lancé
pour effectuer un traitement particulier, et qui se termine
à la fin de ce traitement.


Il s'agit exactement d'optimiser ce cas là.


Or que l'utilisation dans la page que tu as citée était
principalement pour limiter le nombre de threads. La technique
se rapproche (et porte le même nom), mais n'est pas tout à fait
identique.

Plutôt que de détruire à chaque fois le thread, le replacer
dans une liste de thread non-actif, et de manière transparente
reprendre l'un de ces thread pour gérer la prochaine requête.
Mais on se posera toujours un peu aussi la question de savoir
comment limiter le nombre de thread.


Si le but est l'optimisation, est-ce qu'il faut les limiter ?

La solution la plus simple, c'est de démarrer un certain nombre
de threads dès le début, et de travailler toujours avec ce
nombre. Sinon, on peut essayer d'optimiser le nombre de threads
selon divers critères.

Dans la contexte de Boost : tu crées une queue de messages à
partir de boost::mutex et de boost::condition. Tu en passes
l'adresse à chaque thread que tu crées ; le thread boucle
sur une attente à la queue. Tu passes l'objet fonctionnel
par la queue de messages. Ça doit faire moins de cinquante
lignes de code tout cassé.


Certe, mais 50 lignes un peu délicate.


Pas si compliqué que ça : il s'agit en fin de compte
d'implémenter quelque chose de connu. Donc, la fonction qu'on
passe au constructeur de boost::thread pourrait bien ressemblait
à quelque chose comme :

void
pooledThread(
MessageQueue* queue )
{
while ( ! shutdownRequested() ) {
boost::function0< void >
f = queue->receive() ;
f() ;
}
}

La MessageQueue, il te le faut probablement de toute façon :

class MessageQueue
{
public:
typedef boost::function0< void >
Fnc ;

void send( Fnc const& f)
{
boost::mutex::scoped_lock
lock( myMutex ) ;
myQueue.push_back( f ) ;
myCondition.notify_one() ;
}
Fnc receive()
{
boost::mutex::scoped_lock
lock( myMutex ) ;
while ( myQueue.empty() ) {
myCondition.wait( lock ) ;
}
Fnc result = myQueue.front() ;
myQueue.pop_front() ;
return result ;
}

private:
boost::mutex myMutex ;
boost::condition myCondition ;
std::deque< Fnc > myQueue ;
} ;

Maintenant, il te reste à démarrer le nombre de pooledThread
voulu, puis d'envoyer la « fonction » dans la queue, plutôt
que de l'utiliser comme paramètre au constructeur d'un
boost::thread.

--
James Kanze GABI Software
Conseils en informatique orientée objet/
Beratung in objektorientierter Datenverarbeitung
9 place Sémard, 78210 St.-Cyr-l'École, France, +33 (0)1 30 23 00 34



Avatar
Michel Decima
In news:,
kanze typed:

Pas si compliqué que ça : il s'agit en fin de compte
d'implémenter quelque chose de connu. Donc, la fonction qu'on
passe au constructeur de boost::thread pourrait bien ressemblait
à quelque chose comme :

void
pooledThread(
MessageQueue* queue )
{
while ( ! shutdownRequested() ) {
boost::function0< void >
f = queue->receive() ;
f() ;
}
}


Si je comprends bien l'implementation de la MessageQueue, l'appel
a receive est bloquant. Donc le seul moyen de debloquer un thread,
c'est d'envoyer une fonction dans la queue qui passera la valeur
de shutdownRequested() a true. Mais ca ne debloque qu'un seul
thread, les (eventuels) autres restent a attendre sur le receive. Il faut
donc envoyer autant de fonctions de debloquage que de threads.

Il y a un moyen élegant (a mon gout) de resoudre ce probleme
avec le principe de la ActiveQueue de ACE. Grosso modo,
la queue a un etat (activé ou non), et le receive n'est bloquant
que si la queue est active. Quand on la desactive, les receive
se debloquent automatiquement, en retournant false:

La fonction de traitement devient:

void pooledThread(MessageQueue* queue )
{
boost::function0< void > f;
while (queue->receive(f))
{
f();
}
}

pour debloquer tous les threads immediatement, il suffit d'appeler

queue->deactivate();

qui peut tres bien etre traite sous forme de requete (on laisse le temps
de finir tous les traitements dans la queue):

queue->send(boost::bind(&MessageQueue::deactivate, queue));

La fonction receive doit ressembler a quelque chose comme ceci:

bool MessageQueue::receive(Fnc& f)
{
boost::mutex::scoped_lock lock( myMutex ) ;

if (m_state != ACTIVATED)
return false;

while (myQueue.empty() ) {
myCondition.wait( lock ) ;
if (myState != ACTIVATED)
return false;
}

f = myQueue.front() ;
myQueue.pop_front() ;
return true;
}

void MessageQueue::deactivate()
{
myState = ACTIVATED;
myCondition.notify_all();
}

Avatar
James Kanze
Michel Decima wrote:
In news:,
kanze typed:
Pas si compliqué que ça : il s'agit en fin de compte
d'implémenter quelque chose de connu. Donc, la fonction qu'on
passe au constructeur de boost::thread pourrait bien ressemblait
à quelque chose comme :



void
pooledThread(
MessageQueue* queue )
{
while ( ! shutdownRequested() ) {
boost::function0< void >
f = queue->receive() ;
f() ;
}
}



Si je comprends bien l'implementation de la MessageQueue, l'appel a
receive est bloquant. Donc le seul moyen de debloquer un thread, c'est
d'envoyer une fonction dans la queue qui passera la valeur de
shutdownRequested() a true. Mais ca ne debloque qu'un seul thread, les
(eventuels) autres restent a attendre sur le receive. Il faut donc
envoyer autant de fonctions de debloquage que de threads.


C'était une esquisse ; un début, et non une solution complète. Ce que
je fais dans mes propres applications, en générale, c'est de positionner
une variable globale qui fait que shutdownRequested() renverra true,
puis d'envoyer une fonction no-op à tous les threads, pour les
débloquer. Mais beaucoup d'autres solutions sont envisagables ; dans la
contexte précise d'un pool de threads, par exemple, on pourrait très
bien faire en sort que pooledThread reçoit aussi un pointeur à le
ThreadPool objet auquel il appartient ; c'est cet objet qui gèrerait
alors les shutdownRequested. Avec un
std::vector< std::shared_ptr< boost::thread > > pour pouvoir
rétrouver les threads, les envoyer la fonction no-op, et en faire le
join.

Une autre solution, avec laquelle j'ai un peu expérimenté, c'est de
mettre la boucle dans un bloc de try, et envoyer une fonction qui lève
une exception prédéfinie. (En passant, il faut que je régarde ce que
Boost dit si la fonction retourne une excéption, plutôt que par la voie
normale. Que ce soit un comportement indéfini ne m'étonnerait pas ;
dans tel cas, il faudrait en tout cas le bloc de try.)

Sans parler des cas où l'application tourne sans arrêt, et sans
véritable provision pour un shutdown propre. Ce n'est pas très élégant,
mais dans beaucoup de cas, ce n'est pas vraiment genant.

Il y a un moyen élegant (a mon gout) de resoudre ce probleme
avec le principe de la ActiveQueue de ACE. Grosso modo,
la queue a un etat (activé ou non), et le receive n'est bloquant
que si la queue est active. Quand on la desactive, les receive
se debloquent automatiquement, en retournant false:


Pourquoi pas ? C'est aussi une solution. L'avantage de la solution
ci-dessus, c'est que si ce traitement durait, je pourrais aussi appeler
shutdownRequested au cour du traitement et l'interrompre.

J'avoue que quand je me suis penché sur le problème, il s'agissait des
pthreads, et non de boost::thread. Et pthread sous Solaris, avec Sun CC,
ouvrait une solution assez élégante : pthread_cancel. Et que je m'y
suis basé. Ensuite, il a fallu porter à g++. Où pthread_cancel ne marche
pas. Du coup, j'ai dû trouver rapidement une solution. Et certains
threads faisaient des traitements très longs, avec les attentes
bloquantes sur d'autres machines. Du coup, j'ai ajouté la fonction
shutdownRequested(), et j'ai changé toutes les attentes en boucles
d'attente avec hors temps et vérification de shutdownRequested à chaque
passage dans la boucle.

(En ce qui concerne ACE, je n'y toucherais pas -- je l'ai régardé où
j'étais avant, et il y avait tellement de problèmes de qualité qu'il m'a
rébuté.)

La fonction de traitement devient:


void pooledThread(MessageQueue* queue )
{
boost::function0< void > f;
while (queue->receive(f))
{
f();
}
}


pour debloquer tous les threads immediatement, il suffit d'appeler


queue->deactivate();


Par exemple. Selon l'algorithme d'optimisation des ressources qu'on veut
implémenter, on pourrait aussi ajouter un time-out sur le receive. Au
bout de disons cinq minutes sans requête, on termine le thread, en
concluant qu'il n'y en avait plus besoin (et en libérant ainsi les
ressources qu'il utilise pour d'autres utilisations).

Une bonne implémentation générique d'un pool de thread offrira bien de
telles possibilités, mais j'imagine que la plupart du temps, c'est du
supérflu, dont on se passera dans une implémentation « sur mesure ».

qui peut tres bien etre traite sous forme de requete (on laisse le
temps de finir tous les traitements dans la queue):


Là, c'est une question de spécification. Dans mon cas précis (qui
n'était pas celui d'un pool de thread), il fallait s'arrêter
rélativement vite, sans commencer de nouvelles requêtes, et même dans
certains cas, en interrompant les requêtes en cours. Mais ici aussi,
plusieurs solutions sont possibles, selon ce qu'on veut.

queue->send(boost::bind(&MessageQueue::deactivate, queue));


La fonction receive doit ressembler a quelque chose comme ceci:


bool MessageQueue::receive(Fnc& f)
{
boost::mutex::scoped_lock lock( myMutex ) ;


if (m_state != ACTIVATED)
return false;


while (myQueue.empty() ) {
myCondition.wait( lock ) ;
if (myState != ACTIVATED)
return false;
}


f = myQueue.front() ;
myQueue.pop_front() ;
return true;
}


void MessageQueue::deactivate()
{
myState = ACTIVATED;
myCondition.notify_all();
}


J'avoue que l'idée est intéressant. C'est encore une alternative à
considérer.

--
James Kanze
Conseils en informatique orientée objet/
Beratung in objektorientierter Datenverarbeitung
9 place Sémard, 78210 St.-Cyr-l'École, France +33 (0)1 30 23 00 34