diff --git a/includes/sched.h b/includes/sched.h index 1d9839d..359c837 100644 --- a/includes/sched.h +++ b/includes/sched.h @@ -3,34 +3,10 @@ #include #include -#define MAX_TASKS 81920 - struct scheduler; typedef void (*taskfunc)(void *, struct scheduler *); -typedef struct task_info { - taskfunc f; - void *closure; -} taskinfo; - -struct scheduler { - /* Mutex qui protège la structure */ - pthread_mutex_t mutex; - - /* Indicateur de changement d'état */ - pthread_cond_t cond; - - /* Position actuelle dans la pile */ - int top; - - /* Tâches */ - taskinfo tasks[MAX_TASKS]; - - /* Si tout est terminé */ - int exit; -}; - static inline int sched_default_threads(void) { diff --git a/src/sched.c b/src/sched.c index 363a44e..0a03ce2 100644 --- a/src/sched.c +++ b/src/sched.c @@ -2,71 +2,55 @@ #include #include +#include +struct task_info { + void *closure; + taskfunc f; +}; + +struct scheduler { + /* Indicateur de changement d'état */ + pthread_cond_t cond; + + /* Taille de la pile */ + int qlen; + + /* Mutex qui protège la structure */ + pthread_mutex_t mutex; + + /* Nombre de threads en attente */ + int nthsleep; + + /* Pile de tâches */ + struct task_info *tasks; + + /* Position actuelle dans la pile */ + int top; +}; + +/* Ordonnanceur partagé */ static struct scheduler sched; /* Lance une tâche de la pile */ -void * -worker_routine(void *arg) -{ - struct scheduler *s = (struct scheduler *)arg; - - while(1) { - pthread_mutex_lock(&s->mutex); - - if(s->exit == 1) { - pthread_mutex_unlock(&s->mutex); - printf("finalement..rien a faire, ciao\n"); - - break; - } - - // S'il on a rien à faire - if(s->top == -1) { - // printf("attend..\n"); - pthread_cond_wait(&s->cond, &s->mutex); - pthread_mutex_unlock(&s->mutex); - // printf("reveillé!\n"); - continue; - } - // printf("lancement tâche #%d\n", s->top); - - // Extrait la tâche de la pile - taskfunc f = s->tasks[s->top].f; - void *closure = s->tasks[s->top].closure; - s->top--; - pthread_mutex_unlock(&s->mutex); - - // Exécute la tâche - f(closure, s); - - // Signale s'il n'y a plus rien à faire - if(s->top == -1) { - printf("va falloir partir\n"); - pthread_mutex_lock(&s->mutex); - s->exit = 1; - pthread_cond_broadcast(&s->cond); - pthread_mutex_unlock(&s->mutex); - } - } - - return NULL; -} +void *worker_routine(void *); int sched_init(int nthreads, int qlen, taskfunc f, void *closure) { - if(nthreads == 0) { + if(qlen <= 0) { + fprintf(stderr, "qlen must be greater than 0\n"); + return -1; + } + sched.qlen = qlen; + + if(nthreads < 0) { + fprintf(stderr, "nthreads must be greater than 0\n"); + return -1; + } else if(nthreads == 0) { nthreads = sched_default_threads(); } - // TODO : Actuellement on n'utilises pas qlen - // => On utilise une pile de taille fixe - (void)qlen; - - sched.top = -1; - sched.exit = 0; - if(pthread_mutex_init(&sched.mutex, NULL) != 0) { fprintf(stderr, "Can't init mutex\n"); return -1; @@ -77,15 +61,34 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) return -1; } + sched.top = -1; + if((sched.tasks = malloc(qlen * sizeof(struct task_info))) == NULL) { + fprintf(stderr, "Can't allocate memory for stack\n"); + return -1; + } + pthread_t threads[nthreads]; for(int i = 0; i < nthreads; ++i) { if(pthread_create(&threads[i], NULL, worker_routine, &sched) != 0) { - fprintf(stderr, "Can't create threads\n"); + fprintf(stderr, "Can't create the thread %d", i); + + if(i > 0) { + fprintf(stderr, ", cancelling already created threads...\n"); + for(int j = 0; j < i; ++j) { + if(pthread_cancel(threads[j]) != 0) { + fprintf(stderr, "Can't cancel the thread %d\n", j); + } + } + } else { + fprintf(stderr, "\n"); + } + + free(sched.tasks); return -1; } } - if(sched_spawn(f, closure, &sched) != 0) { + if(sched_spawn(f, closure, &sched) < 0) { fprintf(stderr, "Can't create the initial task\n"); return -1; } @@ -105,17 +108,59 @@ sched_spawn(taskfunc f, void *closure, struct scheduler *s) { pthread_mutex_lock(&s->mutex); - if(s->top + 1 >= MAX_TASKS) { + if(s->top + 1 >= s->qlen) { pthread_mutex_unlock(&s->mutex); errno = EAGAIN; fprintf(stderr, "Stack is full\n"); return -1; } - s->tasks[++s->top] = (taskinfo){f, closure}; + s->tasks[++s->top] = (struct task_info){closure, f}; pthread_cond_signal(&s->cond); pthread_mutex_unlock(&s->mutex); return 0; } + +void * +worker_routine(void *arg) +{ + struct scheduler *s = (struct scheduler *)arg; + + while(1) { + pthread_mutex_lock(&s->mutex); + + // S'il on a rien à faire + if(s->top == -1) { + // tentative avec nthsleep???? pourquoi 10 aaaaa j'ai 12 coeurs?? + s->nthsleep++; + if(s->nthsleep >= 10) { + pthread_mutex_unlock(&s->mutex); + break; + } + + pthread_cond_wait(&s->cond, &s->mutex); + pthread_mutex_unlock(&s->mutex); + continue; + } + + // Extrait la tâche de la pile + taskfunc f = s->tasks[s->top].f; + void *closure = s->tasks[s->top].closure; + s->top--; + pthread_mutex_unlock(&s->mutex); + + // Exécute la tâche + f(closure, s); + + // Signale s'il n'y a plus rien à faire + if(s->top == -1) { + pthread_mutex_lock(&s->mutex); + pthread_cond_broadcast(&s->cond); + pthread_mutex_unlock(&s->mutex); + } + } + + return NULL; +}