diff --git a/includes/sched.h b/includes/sched.h index b0832bb..d7e695d 100644 --- a/includes/sched.h +++ b/includes/sched.h @@ -1,13 +1,30 @@ #pragma once -#include +#include #include +#define MAX_TASKS 1024 + struct scheduler; typedef void (*taskfunc)(void *, struct scheduler *); -static inline int sched_default_threads() { +struct scheduler { + /* Mutex qui protège la pile */ + pthread_mutex_t mutex; + + /* Indicateur de changement d'état de la pile */ + pthread_cond_t cond; + + /* Position actuelle dans la pile */ + int top; + + /* Tâches */ + taskfunc tasks[MAX_TASKS]; + void *closures[MAX_TASKS]; +}; + +static inline int sched_default_threads(void) { return sysconf(_SC_NPROCESSORS_ONLN); } diff --git a/src/sched.c b/src/sched.c index 801a52f..6cf4703 100644 --- a/src/sched.c +++ b/src/sched.c @@ -1,29 +1,102 @@ #include "../includes/sched.h" -#include +#include #include +static struct scheduler sched; + +/* Lance une tâche de la pile */ +void *worker_routine(void *arg) { + struct scheduler *s = (struct scheduler *)arg; + + while (1) { + // Attente d'un changement d'état + pthread_cond_wait(&s->cond, &s->mutex); + + pthread_mutex_lock(&s->mutex); + if (s->top == -1) { + // Il n'y a plus de tâches à exécuter + pthread_mutex_unlock(&s->mutex); + break; + } + + // Extrait la tâche de la pile + taskfunc f = s->tasks[s->top]; + void *closure = s->closures[s->top]; + s->top--; + + pthread_mutex_unlock(&s->mutex); + + // Exécute la tâche + f(closure, s); + + // Signale que la tâche est terminée + pthread_cond_signal(&s->cond); + } + + return NULL; +} + int sched_init(int nthreads, int qlen, taskfunc f, void *closure) { - sched_spawn(f, closure, NULL); - return 0; + if (nthreads == 0) { + nthreads = sched_default_threads(); + } + + // Actuellement on n'utilises pas qlen + // => On utilise une pile de taille fixe + (void)qlen; + + sched.top = -1; + + if (pthread_mutex_init(&sched.mutex, NULL) != 0) { + fprintf(stderr, "Can't init mutex\n"); + return -1; + } + + if (pthread_cond_init(&sched.cond, NULL) != 0) { + fprintf(stderr, "Can't init condition variable\n"); + return -1; + } + + pthread_t threads[nthreads]; + for (int i = 0; i < nthreads; ++i) { + if (pthread_create(&threads[i], NULL, worker_routine, &sched) != 0) { + fprintf(stderr, "Can't create threads\n"); + return -1; + } + } + + if (sched_spawn(f, closure, &sched) != 0) { + fprintf(stderr, "Can't create a new task\n"); + return -1; + } + + for (int i = 0; i < nthreads; ++i) { + if ((pthread_join(threads[i], NULL) != 0)) { + fprintf(stderr, "Can't wait the thread %d\n", i); + return -1; + } + } + + return 1; } int sched_spawn(taskfunc f, void *closure, struct scheduler *s) { - pthread_t thread; - int err; + pthread_mutex_lock(&s->mutex); - // Création d'un thread pour la tâche - if ((err = pthread_create(&thread, NULL, (void *(*)(void *))f, closure)) != - 0) { - fprintf(stderr, "pthread_create error %d\n", errno); + if (s->top + 1 >= MAX_TASKS) { + pthread_mutex_unlock(&s->mutex); + errno = EAGAIN; + fprintf(stderr, "Stack full\n"); return -1; } - // Attend la fin du thread - if ((err = pthread_join(thread, NULL)) != 0) { - fprintf(stderr, "pthread_join error %d\n", errno); - return -1; - } + s->top++; + s->tasks[s->top] = f; + s->closures[s->top] = closure; + + pthread_mutex_unlock(&s->mutex); + pthread_cond_signal(&s->cond); return 0; }