diff --git a/src/sched.c b/src/sched.c index bdd1244..1138ae3 100644 --- a/src/sched.c +++ b/src/sched.c @@ -11,17 +11,15 @@ struct task_info { taskfunc f; }; -struct robber { - pthread_cond_t cond; - pthread_mutex_t mutex; -}; - struct scheduler { /* Taille des piles */ int qlen; + /* Variable de conditions pour reveillé les threads au besoin */ + pthread_cond_t cond; + /* Mutex qui protège les piles */ - pthread_mutex_t *mutex; + pthread_mutex_t mutex; /* Nombre de threads instanciés */ int nthreads; @@ -32,11 +30,13 @@ struct scheduler { /* Liste des threads */ pthread_t *threads; - /* Positions actuelle dans la pile */ - int *top; + /* Compteur des threads dormants */ + int nthsleep; - /* Infos pour le vol */ - struct robber rob; + /* Stack sous forme de dequeu pour gérer la récupération + * du premier élément ajouté */ + int *top; + int *bottom; }; /* Ordonnanceur partagé */ @@ -48,25 +48,22 @@ void *sched_worker(void *); /* Nettoie les opérations effectuées par l'initialisation de l'ordonnanceur */ int sched_init_cleanup(int); -/* sched_spawn sur un thread spécifique */ -int sched_spawn_core(taskfunc, void *, struct scheduler *, int); - /* Récupère l'index du thread courant */ int current_thread(struct scheduler *); int sched_init(int nthreads, int qlen, taskfunc f, void *closure) { - sched.mutex = NULL; sched.tasks = NULL; sched.threads = NULL; sched.top = NULL; + sched.bottom = NULL; if(qlen <= 0) { fprintf(stderr, "qlen must be greater than 0\n"); return -1; } - sched.qlen = qlen; + sched.qlen = qlen + 1; // circular buffer if(nthreads < 0) { fprintf(stderr, "nthreads must be greater than 0\n"); @@ -76,29 +73,32 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) } sched.nthreads = nthreads; - // Initialisation des infos de vol - sched.rob = - (struct robber){PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}; + sched.nthsleep = 0; - // Initialisation des mutex de chaque processus - if(!(sched.mutex = malloc(sched.nthreads * sizeof(pthread_mutex_t)))) { - perror("Mutexes"); + // Initialisation du mutex + if(pthread_mutex_init(&sched.mutex, NULL) != 0) { + fprintf(stderr, "Can't init mutex\n"); return sched_init_cleanup(-1); } - for(int i = 0; i < sched.nthreads; ++i) { - if(pthread_mutex_init(&sched.mutex[i], NULL) != 0) { - fprintf(stderr, "Can't init mutex for thread %d\n", i); - return sched_init_cleanup(-1); - } + + // Initialisation variable de condition + if(pthread_cond_init(&sched.cond, NULL) != 0) { + fprintf(stderr, "Can't init varcond\n"); + return sched_init_cleanup(-1); } // Initialisation du curseur suivant l'état de la pile de chaque processus if(!(sched.top = malloc(sched.nthreads * sizeof(int)))) { - perror("Cursor top stack\n"); + perror("Cursor top stack"); + return sched_init_cleanup(-1); + } + if(!(sched.bottom = malloc(sched.nthreads * sizeof(int)))) { + perror("Cursor bottom stack"); return sched_init_cleanup(-1); } for(int i = 0; i < sched.nthreads; ++i) { - sched.top[i] = -1; + sched.top[i] = 0; + sched.bottom[i] = 0; } // Allocation mémoire pour la pile de chaque processus @@ -113,17 +113,22 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) } } - // Ajoute la tâche initiale - if(sched_spawn_core(f, closure, &sched, 0) < 0) { - fprintf(stderr, "Can't create the initial task\n"); - return sched_init_cleanup(-1); - } - // Créer les threads if(!(sched.threads = malloc(sched.nthreads * sizeof(pthread_t *)))) { perror("Threads"); return sched_init_cleanup(-1); } + + // Ajoute la tâche initiale + if(sched_spawn(f, closure, &sched) < 0) { + fprintf(stderr, "Can't create the initial task\n"); + return sched_init_cleanup(-1); + } + + // Initialise l'aléatoire + srand(time(NULL)); + + // Démarre les threads for(int i = 0; i < nthreads; ++i) { if(pthread_create(&sched.threads[i], NULL, sched_worker, &sched) != 0) { fprintf(stderr, "Can't create the thread %d\n", i); @@ -156,14 +161,9 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) int sched_init_cleanup(int ret_code) { - if(sched.mutex) { - for(int i = 0; i < sched.nthreads; ++i) { - pthread_mutex_destroy(&sched.mutex[i]); - } + pthread_mutex_destroy(&sched.mutex); - free(sched.mutex); - sched.mutex = NULL; - } + pthread_cond_destroy(&sched.cond); if(sched.tasks) { for(int i = 0; i < sched.nthreads; ++i) { @@ -187,23 +187,12 @@ sched_init_cleanup(int ret_code) sched.top = NULL; } - pthread_cond_destroy(&sched.rob.cond); - pthread_mutex_destroy(&sched.rob.mutex); - - return ret_code; -} - -int -sched_spawn(taskfunc f, void *closure, struct scheduler *s) -{ - int core; - if((core = current_thread(s)) < 0) { - fprintf(stderr, "Thread not in list, who am I?\n"); - return -1; + if(sched.bottom) { + free(sched.bottom); + sched.bottom = NULL; } - // On ajoute la tâche sur la pile du thread courant - return sched_spawn_core(f, closure, s, core); + return ret_code; } int @@ -220,21 +209,27 @@ current_thread(struct scheduler *s) } int -sched_spawn_core(taskfunc f, void *closure, struct scheduler *s, int core) +sched_spawn(taskfunc f, void *closure, struct scheduler *s) { - pthread_mutex_lock(&s->mutex[core]); + int th; + if((th = current_thread(s)) < 0) { + th = 0; + } - if(s->top[core] + 1 >= s->qlen) { - pthread_mutex_unlock(&s->mutex[core]); - errno = EAGAIN; + pthread_mutex_lock(&s->mutex); + + int next = (s->top[th] + 1) % s->qlen; + if(next == s->bottom[th]) { + pthread_mutex_unlock(&s->mutex); fprintf(stderr, "Stack is full\n"); + errno = EAGAIN; return -1; } - s->top[core]++; - s->tasks[core][s->top[core]] = (struct task_info){closure, f}; + s->tasks[th][s->top[th]] = (struct task_info){closure, f}; + s->top[th] = next; - pthread_mutex_unlock(&s->mutex[core]); + pthread_mutex_unlock(&s->mutex); return 0; } @@ -247,78 +242,59 @@ sched_worker(void *arg) // Récupère le processus courant (index tableau) int curr_th; if((curr_th = current_thread(s)) < 0) { - fprintf(stderr, "Worker thread not tracked, exiting...\n"); + fprintf(stderr, "Thread unknown, exiting...\n"); return NULL; } + struct task_info task; + int found; while(1) { - pthread_mutex_lock(&s->rob.mutex); - pthread_mutex_lock(&s->mutex[curr_th]); + found = 0; + pthread_mutex_lock(&s->mutex); - // Si rien à faire - if(s->top[curr_th] == -1) { - // Cherche un thread (avec le + de tâches en attente) à voler - int stolen = -1; - - pthread_cond_wait(&s->rob.cond, &s->rob.mutex); - for(int i = 0, max_tasks = -1; i < s->nthreads; ++i) { - if(i == curr_th) { - continue; // On ne se vole pas soi-même - } - - // Verrouille le mutex du thread candidat - /* pthread_mutex_lock(&s->mutex[i]); */ - - if(s->top[i] > max_tasks) { - max_tasks = s->top[i]; - stolen = i; - } - - // Déverrouille le mutex du thread candidat - /* pthread_mutex_unlock(&s->mutex[i]); */ - } - - // Vole une tâche à un autre thread - if(stolen >= 0) { - struct task_info theft; - pthread_mutex_lock(&s->mutex[stolen]); - - // Actuellement on prend la tâche la plus ancienne en - // inversant la première et la dernière - // TODO: Récupérer la premiere tâche tout en respectant l'ordre - theft = s->tasks[stolen][0]; - s->tasks[stolen][0] = s->tasks[stolen][s->top[stolen]]; - s->top[stolen]--; - - pthread_mutex_unlock(&s->mutex[stolen]); - - pthread_mutex_unlock(&s->mutex[curr_th]); - pthread_mutex_unlock(&s->rob.mutex); - - // Rajoute la tâche sur notre pile - sched_spawn_core(theft.f, theft.closure, s, curr_th); - - continue; - } - - pthread_mutex_unlock(&s->mutex[curr_th]); - pthread_cond_broadcast(&s->rob.cond); - pthread_mutex_unlock(&s->rob.mutex); - printf("%d se tire\n", curr_th); - break; + if(s->bottom[curr_th] != s->top[curr_th]) { + found = 1; + s->top[curr_th] = (s->top[curr_th] - 1 + s->qlen) % s->qlen; + task = s->tasks[curr_th][s->top[curr_th]]; } - // Extrait la tâche de la pile - taskfunc f = s->tasks[curr_th][s->top[curr_th]].f; - void *closure = s->tasks[curr_th][s->top[curr_th]].closure; - s->top[curr_th]--; - pthread_mutex_unlock(&s->mutex[curr_th]); + if(!found) { + // Vol car aucune tâche trouvée + for(int i = 0, k = rand() % (s->nthreads + 1), target; + i < s->nthreads; ++i) { + target = (i + curr_th) % s->nthreads; + + if(s->bottom[target] != s->top[target]) { + // Tâche trouvée + found = 1; + s->top[target] = (s->top[target] - 1 + s->qlen) % s->qlen; + task = s->tasks[target][s->top[target]]; + break; + } + } + + // Aucune tâche à faire + if(!found) { + s->nthsleep++; + // Ne partir que si tout le monde dort + if(s->nthsleep >= s->nthreads) { + pthread_cond_broadcast(&s->cond); + pthread_mutex_unlock(&s->mutex); + + break; + } + pthread_cond_wait(&s->cond, &s->mutex); + s->nthsleep--; + pthread_mutex_unlock(&s->mutex); + continue; + } + } + + pthread_cond_broadcast(&s->cond); + pthread_mutex_unlock(&s->mutex); // Exécute la tâche - f(closure, s); - - pthread_cond_broadcast(&s->rob.cond); - pthread_mutex_unlock(&s->rob.mutex); + task.f(task.closure, s); } return NULL;