vu que chaque thread vole aux autres, il y a plus besoin d'attendre qui que ce soit, alors on retire les variables de conditions

This commit is contained in:
Mylloon 2024-04-21 18:52:22 +02:00
parent 37ff2cfb93
commit 1b6db69cd2
Signed by: Anri
GPG key ID: A82D63DFF8D1317F
3 changed files with 27 additions and 49 deletions

View file

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <pthread.h>
#include <unistd.h> #include <unistd.h>
struct scheduler; struct scheduler;

View file

@ -1,5 +1,7 @@
#include "../includes/quicksort.h" #include "../includes/quicksort.h"
#include <time.h>
int int
partition(int *a, int lo, int hi) partition(int *a, int lo, int hi)
{ {

View file

@ -1,6 +1,7 @@
#include "../includes/sched.h" #include "../includes/sched.h"
#include <errno.h> #include <errno.h>
#include <pthread.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -11,9 +12,6 @@ struct task_info {
}; };
struct scheduler { struct scheduler {
/* Indicateurs de changement d'état */
pthread_cond_t *cond;
/* Taille des piles */ /* Taille des piles */
int qlen; int qlen;
@ -23,9 +21,6 @@ struct scheduler {
/* Nombre de threads instanciés */ /* Nombre de threads instanciés */
int nthreads; int nthreads;
/* Nombre de threads en attente */
int nthsleep;
/* Piles de tâches */ /* Piles de tâches */
struct task_info **tasks; struct task_info **tasks;
@ -54,7 +49,6 @@ int current_thread(struct scheduler *);
int int
sched_init(int nthreads, int qlen, taskfunc f, void *closure) sched_init(int nthreads, int qlen, taskfunc f, void *closure)
{ {
sched.cond = NULL;
sched.mutex = NULL; sched.mutex = NULL;
sched.tasks = NULL; sched.tasks = NULL;
sched.threads = NULL; sched.threads = NULL;
@ -74,8 +68,6 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure)
} }
sched.nthreads = nthreads; sched.nthreads = nthreads;
sched.nthsleep = 0;
// Initialisation des mutex de chaque processus // Initialisation des mutex de chaque processus
if(!(sched.mutex = malloc(sched.nthreads * sizeof(pthread_mutex_t)))) { if(!(sched.mutex = malloc(sched.nthreads * sizeof(pthread_mutex_t)))) {
perror("Mutexes"); perror("Mutexes");
@ -88,18 +80,6 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure)
} }
} }
// Initialisation des variables de conditions de chaque processus
if(!(sched.cond = malloc(sched.nthreads * sizeof(pthread_cond_t)))) {
perror("Variable conditions");
return sched_init_cleanup(-1);
}
for(int i = 0; i < sched.nthreads; ++i) {
if(pthread_cond_init(&sched.cond[i], NULL) != 0) {
fprintf(stderr, "Can't init condition variable for thread %d\n", i);
return sched_init_cleanup(-1);
}
}
// Initialisation du curseur suivant l'état de la pile de chaque processus // Initialisation du curseur suivant l'état de la pile de chaque processus
if(!(sched.top = malloc(sched.nthreads * sizeof(int)))) { if(!(sched.top = malloc(sched.nthreads * sizeof(int)))) {
perror("Cursor top stack\n"); perror("Cursor top stack\n");
@ -121,6 +101,12 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure)
} }
} }
// Ajoute la tâche initiale
if(sched_spawn_core(f, closure, &sched, 0) < 0) {
fprintf(stderr, "Can't create the initial task\n");
return sched_init_cleanup(-1);
}
if(!(sched.threads = malloc(sched.nthreads * sizeof(pthread_t *)))) { if(!(sched.threads = malloc(sched.nthreads * sizeof(pthread_t *)))) {
perror("Threads"); perror("Threads");
return sched_init_cleanup(-1); return sched_init_cleanup(-1);
@ -144,12 +130,6 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure)
} }
} }
// Ajoute la tâche initiale
if(sched_spawn_core(f, closure, &sched, 0) < 0) {
fprintf(stderr, "Can't create the initial task\n");
return sched_init_cleanup(-1);
}
for(int i = 0; i < nthreads; ++i) { for(int i = 0; i < nthreads; ++i) {
if((pthread_join(sched.threads[i], NULL) != 0)) { if((pthread_join(sched.threads[i], NULL) != 0)) {
fprintf(stderr, "Can't wait the thread %d\n", i); fprintf(stderr, "Can't wait the thread %d\n", i);
@ -172,11 +152,6 @@ sched_init_cleanup(int ret_code)
sched.mutex = NULL; sched.mutex = NULL;
} }
if(sched.cond) {
free(sched.cond);
sched.cond = NULL;
}
if(sched.tasks) { if(sched.tasks) {
for(int i = 0; i < sched.nthreads; ++i) { for(int i = 0; i < sched.nthreads; ++i) {
if(sched.tasks[i]) { if(sched.tasks[i]) {
@ -231,9 +206,12 @@ current_thread(struct scheduler *s)
int int
sched_spawn_core(taskfunc f, void *closure, struct scheduler *s, int core) sched_spawn_core(taskfunc f, void *closure, struct scheduler *s, int core)
{ {
// printf("%d locking (a)\n", core);
pthread_mutex_lock(&s->mutex[core]); pthread_mutex_lock(&s->mutex[core]);
// printf("%d locked (a)\n", core);
if(s->top[core] + 1 >= s->qlen) { if(s->top[core] + 1 >= s->qlen) {
// printf("%d unlock (a)\n", core);
pthread_mutex_unlock(&s->mutex[core]); pthread_mutex_unlock(&s->mutex[core]);
errno = EAGAIN; errno = EAGAIN;
fprintf(stderr, "Stack is full\n"); fprintf(stderr, "Stack is full\n");
@ -243,7 +221,7 @@ sched_spawn_core(taskfunc f, void *closure, struct scheduler *s, int core)
s->top[core]++; s->top[core]++;
s->tasks[core][s->top[core]] = (struct task_info){closure, f}; s->tasks[core][s->top[core]] = (struct task_info){closure, f};
pthread_cond_signal(&s->cond[core]); // printf("%d unlock (a)\n", core);
pthread_mutex_unlock(&s->mutex[core]); pthread_mutex_unlock(&s->mutex[core]);
return 0; return 0;
@ -262,40 +240,39 @@ sched_worker(void *arg)
} }
while(1) { while(1) {
// printf("%d locking (b)\n", curr_th);
pthread_mutex_lock(&s->mutex[curr_th]); pthread_mutex_lock(&s->mutex[curr_th]);
// printf("%d locked (b)\n", curr_th);
// Si rien à faire // Si rien à faire
if(s->top[curr_th] == -1) { if(s->top[curr_th] == -1) {
if(s->nthsleep + 1 == s->nthreads) {
// Signal a tout les threads que il n'y a plus rien à faire
pthread_cond_broadcast(&s->cond[curr_th]);
pthread_mutex_unlock(&s->mutex[curr_th]);
break;
}
// Cherche un thread (avec le + de tâches en attente) à voler // Cherche un thread (avec le + de tâches en attente) à voler
int stolen = -1; int stolen = -1;
for(int i = 0, size = -1; i < s->nthreads; ++i) { /* for(int i = 0, size = -1; i < s->nthreads; ++i) {
if(i == curr_th) { if(i == curr_th) {
// On ne se vole pas soi-même // On ne se vole pas soi-même
continue; continue;
} }
printf("%d locking (c)\n", i);
pthread_mutex_lock(&s->mutex[i]); pthread_mutex_lock(&s->mutex[i]);
printf("%d locked (c)\n", i);
if(s->top[i] > size) { if(s->top[i] > size) {
stolen = i; stolen = i;
size = s->top[i]; size = s->top[i];
} }
printf("%d unlock (c)\n", i);
pthread_mutex_unlock(&s->mutex[i]); pthread_mutex_unlock(&s->mutex[i]);
} } */
// Vole une tâche à un autre thread // Vole une tâche à un autre thread
if(stolen >= 0) { if(stolen >= 0) {
struct task_info theft; struct task_info theft;
// printf("%d locking (d)\n", stolen);
pthread_mutex_lock(&s->mutex[stolen]); pthread_mutex_lock(&s->mutex[stolen]);
// printf("%d locked (d)\n", stolen);
// Actuellement on prend la tâche la plus ancienne en // Actuellement on prend la tâche la plus ancienne en
// inversant la première et la dernière // inversant la première et la dernière
@ -304,8 +281,10 @@ sched_worker(void *arg)
s->tasks[stolen][0] = s->tasks[stolen][s->top[stolen]]; s->tasks[stolen][0] = s->tasks[stolen][s->top[stolen]];
s->top[stolen]--; s->top[stolen]--;
// printf("%d unlock (d)\n", stolen);
pthread_mutex_unlock(&s->mutex[stolen]); pthread_mutex_unlock(&s->mutex[stolen]);
// printf("%d unlock (b)\n", curr_th);
pthread_mutex_unlock(&s->mutex[curr_th]); pthread_mutex_unlock(&s->mutex[curr_th]);
// Rajoute la tâche sur notre pile // Rajoute la tâche sur notre pile
@ -314,18 +293,16 @@ sched_worker(void *arg)
continue; continue;
} }
s->nthsleep++;
pthread_cond_wait(&s->cond[curr_th], &s->mutex[curr_th]);
s->nthsleep--;
pthread_mutex_unlock(&s->mutex[curr_th]); pthread_mutex_unlock(&s->mutex[curr_th]);
// printf("%d se tire car R à faire\n", curr_th);
continue; break;
} }
// Extrait la tâche de la pile // Extrait la tâche de la pile
taskfunc f = s->tasks[curr_th][s->top[curr_th]].f; taskfunc f = s->tasks[curr_th][s->top[curr_th]].f;
void *closure = s->tasks[curr_th][s->top[curr_th]].closure; void *closure = s->tasks[curr_th][s->top[curr_th]].closure;
s->top[curr_th]--; s->top[curr_th]--;
// printf("%d unlock (b)\n", curr_th);
pthread_mutex_unlock(&s->mutex[curr_th]); pthread_mutex_unlock(&s->mutex[curr_th]);
// Exécute la tâche // Exécute la tâche