[C#] Thread Pooling

Cette section est destinée aux scripts partagés par la communauté. Chaque post est destiné à un script. Suivez bien les recommandations.
Avatar de l’utilisateur
Alesk
Messages : 2303
Inscription : 13 Mars 2012 09:09
Localisation : Bordeaux - France
Contact :

[C#] Thread Pooling

Message par Alesk » 25 Oct 2014 18:55

Hello,

Voici mon petit bout de code qui permet d'exploiter la classe native de Thread Pooling :

Code : Tout sélectionner

using System;
using System.Threading;
using UnityEngine;

public static class ThreadPool
{
	private static ManualResetEvent[] doneEvents = new ManualResetEvent[1];

	private struct ThreadPoolData
	{
		public Action action;
		public ManualResetEvent doneEvent;
	}

	// Pool actions in background while main thread is running
	public static void Pool(params Action[] actions){
		for (int i = 0; i < actions.Length; i++){
			ThreadPoolData data = new ThreadPoolData();
			data.action = actions[i];
			System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback (PoolNewThread), data);
		}
	}

	private static void PoolNewThread(System.Object stateInfo){
		try{
			((ThreadPoolData)stateInfo).action.Invoke ();
		}catch(Exception e) {
			Debug.Log("Error in thread "+e);
		}
	}


	// Pool actions and wait for all of them to finish
	public static void PoolAndWait(params Action[] actions){
		
		if(doneEvents.Length < actions.Length){
			doneEvents = new ManualResetEvent[actions.Length];
		}
		
		for (int i = 0; i < doneEvents.Length; i++){
			if(doneEvents[i] == null){
				doneEvents[i] = new ManualResetEvent (true);
			}else{
				doneEvents[i].Set();
			}
		}
		
		for (int i = 0; i < actions.Length; i++){
			doneEvents[i].Reset();
			
			ThreadPoolData data = new ThreadPoolData();
			data.action = actions[i];
			data.doneEvent = doneEvents[i];
			
			System.Threading.ThreadPool.QueueUserWorkItem (new System.Threading.WaitCallback (WaitForThread), data);
		}

		WaitHandle.WaitAll (doneEvents);
	}
	
	private static void WaitForThread(System.Object stateInfo){
		ThreadPoolData data = (ThreadPoolData)stateInfo;
		try{
			data.action.Invoke ();
		}catch(Exception e) {
			Debug.Log("Error in thread "+e);
		}
		data.doneEvent.Set();
	}
}
Pour le moment, c'est toujours en chantier, le code ne demande qu'à être amélioré et complété.
Il est important de noter que comme la plupart des classes unity ne sont pas "thread safe", il ne sera pas possible d'y toucher depuis une fonction exécutée dans un thread, sous risque de plantage, voir de freeze d'unity.
J'ai ajouté un try/catch pour essayer d'éviter ça... à tester pour voir si ça fonctionne bien dans tous les cas.

Si vous avez par exemple un gros tableau de données à traiter, il faut répartir le traitement sur plusieurs threads, en évitant de faire se croiser les accès au données.
Il faut donc assigner à chaque thread une plage de données à traiter. Dans l'exemple ci-dessous, je divise le total selon le nombre de coeurs du cpu disponibles, pour que chacun ne s'occupe que d'une partie du tableau, sans taper dans celle des autres.

Assignez ce script à un gameobject, puis lancez l'execution. Il y a trois tests, un pour chaque bouton de la souris :

- bouton de gauche : exécution asynchrone (donc en arrière plan) de la fonction de test sur chaque coeur
- bouton de droite : exécution synchrone de la fonction de test sur chaque coeur, dans ce cas, le thread principal sera bloqué le temps que tous les threads secondaires se terminent.
Ici on divise donc le temps de traitement par le nombre de coeurs disponibles.

- bouton du milieu : même test que le bouton de droite, mais en doublant le nombre de coeurs à utiliser.
Dans ce cas, mon tableau va être divisé en 8 parts au lieu de 4, et chaque coeur va traiter 2 parts.
On se rend compte que ça va légèrement moins vite que si on utilise que 4 coeurs, du moins dans mon cas.

Ceci n'est valable que lorsqu'on veut exécuter du code synchrone, en répartissant la tâche sur plusieurs coeurs.
Dans le cas d'une exécution asynchrone, les fonctions vont être exécutées dans leur ordre d'ajout, à chaque fois qu'un coeur sera libre pour la traiter.
On ne peut donc pas prédire à l'avance quand les fonctions seront traitées ni quand elles se termineront.

La grosse boucle à la fin, c'est pour allonger la durée du traitement, afin d'avoir des chronométrages compréhensibles ;)

Code : Tout sélectionner

using UnityEngine;
using System;

public class ThreadPool_tests : MonoBehaviour {

	Vector3[] buffer = new Vector3[19];
	int coresCount = 0;

	void Start () {

		coresCount = SystemInfo.processorCount;
	}

	void Update () {
		if(Input.GetMouseButtonDown(0)){
			StartTest(coresCount);
			Invoke("DebugBuffer",1);
		}

		if(Input.GetMouseButtonDown(1)){
			StartTest(coresCount,true);
			DebugBuffer();
		}

		if(Input.GetMouseButtonDown(2)){
			StartTest(coresCount * 2,true);
			DebugBuffer();
		}

	}

	void DebugBuffer(){
		for(int i=0;i<buffer.Length;i++){
			Debug.Log ("buffer "+i+" : "+buffer[i]+" updated by thread #"+buffer[i].y);
		}
	}

	void StartTest(int count,bool wait = false){
		int i;
		Vector3 vector = Vector3.zero;

		Debug.Log ("Test loop start");

		double now = Time.realtimeSinceStartup;


		int total = buffer.Length;
		int step = Mathf.CeilToInt((float)buffer.Length / count);
		int start = 0;
		int end = 0;


		// prepare actions
		Action[] actions = new Action[count];
		for(i = 0;i < count; i++){

			vector.y = i;

			Vector3 _vector = vector;
			int _i = i;
			string _string = "label "+i;
			int _start = start;

			start += step;
			int _end = Mathf.Min (start,total);

			actions[i] = () => this.Test(_i,_vector,_string,_start,_end);
		}

		// send all the actions to the ThreadPool
		if(wait){
			ThreadPool.PoolAndWait(actions);
		}else{
			ThreadPool.Pool(actions);
		}

		now = (Time.realtimeSinceStartup - now)*1000;

		Debug.Log ("Test loop end "+now+"ms");
	}

	public void Test(int index,Vector3 vector,string str,int start,int end){

		Debug.Log ("START "+index+" "+vector+" "+str);

		int now = Environment.TickCount;

		for(int i = start;i<end;i++){
			vector.x = i;
			buffer[i] = vector;
		}
		int total = (end-start) * 1000000;

		for(int i =0;i<total;i++){
			float dist = Mathf.Sqrt(Vector3.Distance(Vector3.one,Vector3.one*5.3f  ));
		}

		Debug.Log ("END "+index+" "+vector+" "+str+" : "+(Environment.TickCount - now)+"ms");

	}
}

Etant loin d'être un spécialiste de ce domaine, j'y vais à tâtons. Toute suggestion éclairée pour améliorer cette classe sera la bienvenue ;)
Dernière édition par Alesk le 25 Oct 2014 19:02, édité 1 fois.

Avatar de l’utilisateur
GTSAReeper
Messages : 230
Inscription : 21 Juil 2013 02:12
Localisation : Creuse - Limousin

Re: [C#] Thread Pooling

Message par GTSAReeper » 25 Oct 2014 19:02

Peut ton utiliser ce système pour générer une carte en arrière plan ?
[center]Image[/center]

Avatar de l’utilisateur
Alesk
Messages : 2303
Inscription : 13 Mars 2012 09:09
Localisation : Bordeaux - France
Contact :

Re: [C#] Thread Pooling

Message par Alesk » 25 Oct 2014 19:06

ça dépend de ce que tu veux générer.
Par exemple tu ne pourras pas créer ou instancier des gameobjects avec ça, car cette classe ne peut pas être utilisée dans un thread :/

Par contre tu peux t'en servir pour précalculer des infos de positionnement qui pourraient demander des calculs lourds, puis utiliser des coroutines pour placer des gameobjects à ces positions, petit à petit.

Avatar de l’utilisateur
GTSAReeper
Messages : 230
Inscription : 21 Juil 2013 02:12
Localisation : Creuse - Limousin

Re: [C#] Thread Pooling

Message par GTSAReeper » 25 Oct 2014 19:11

Et comment envoyer les données calculer d'un thread à l'autre ?
[center]Image[/center]

Avatar de l’utilisateur
Alesk
Messages : 2303
Inscription : 13 Mars 2012 09:09
Localisation : Bordeaux - France
Contact :

Re: [C#] Thread Pooling

Message par Alesk » 25 Oct 2014 19:33

Comme on n'a aucun moyen fiable de synchroniser les threads, il n'est pas question qu'ils communiquent entre eux.
Ils sont là pour mettre à jour des données issues du thread principal.

Par exemple, je prend le cas particulier de mon package de particules de fumée.
Pour détecter quelles sont les particules dans le champ de vision, et quelles sont celles à exclure, je procède ainsi :

- dans le thread principal, je créé des listes qui stockeront les particules visibles, une liste pour chaque coeur présent dans le cpu. Ces liste sont accessibles au niveau global de la classe, pour que n'importe quelle fonction puisse y toucher.
- Grâce au Threadpool en synchrone (PoolAndWait dans ma classe), j'exécute alors plusieurs fois la fonction de test de visibilité qui va parcourir mon tableau de particules, en répartissant sur chaque coeur.
Si par exemple j'ai 4 coeurs et qu'il y a 1000 particules, le premier coeur va tester les particules de 0 à 249, le second de 250 à 499, le suivant de 500 à 749 et le dernier de 750 à 999. Ainsi chaque coeur ne touche qu'à sa propre partie du tableau de particules, sans interférer avec les autres.
De plus, comme chaque thread a sa propre liste de particules visibles à remplir, il n'y aura pas de conflit d'ajout dans une même liste à partir de deux threads différents.
- une fois que tout ceci est fait, je me retrouve de nouveau dans le thread principal, avec mes 4 listes contenant les particules visibles.
la première liste contiendra uniquement les particules visibles se trouvant dans le groupe original dont les index vont de 0 à 249, etc...

Il ne me reste plus qu'à afficher ces particules en parcourant les 4 listes, qui contiendront au pire 250 particules chacune, ou bien moins si certaines sont hors champ.

Dans cet exemple, on divise par 4 le temps de traitement sur un calcul spécifique.
C'est comme ça qu'il faut appréhender cette méthode : elle permet de booster certaines portions d'un processus, mais pas forcément l'intégralité de celui-ci.
De plus, il faut l'employer uniquement lorsque le nombre de données à traiter est assez élevé, sinon ça peut être moins rapide que de tout traiter en une seule fois. Là il faut voir quelle est la limite basse en dessous de laquelle il est plus intéressant de tout traiter en une fois qu'en plusieurs...

Donc ça c'est pour un traitement synchrone.

Pour une exécution asynchrone, ça pourrait être par exemple le calcul d'un chemin pour du pathfinding, ça permet de calculer plusieurs chemins en arrière plan sans bloquer le thread principal.

Voilà voilà :mrgreen:

Répondre

Revenir vers « Scripts »