diff --git a/src/sched-ws.c b/src/sched-ws.c index bd300ca..759e41c 100644 --- a/src/sched-ws.c +++ b/src/sched-ws.c @@ -4,21 +4,37 @@ #include #include #include -#include +/* Tâche */ struct task_info { void *closure; taskfunc f; }; -struct scheduler { +/* Structure de chaque thread */ +struct worker { /* Premier élément du deque (dernier ajouter) */ - int *bottom; + int bottom; - /* Variable de conditions pour reveillé les threads au besoin */ + /* Mutex qui protège cette structure */ + pthread_mutex_t mutex; + + /* Deque de tâches */ + struct task_info *tasks; + + /* Thread */ + pthread_t thread; + + /* Dernier élément du deque (premier ajouter) */ + int top; +}; + +/* Scheduler partagé */ +struct scheduler { + /* Condition threads dormant */ pthread_cond_t cond; - /* Mutex qui protège les piles */ + /* Mutex qui protège cette structure */ pthread_mutex_t mutex; /* Nombre de threads instanciés */ @@ -27,17 +43,11 @@ struct scheduler { /* Compteur des threads dormants */ int nthsleep; - /* Taille des piles */ + /* Taille deque */ int qlen; - /* Piles de tâches */ - struct task_info **tasks; - - /* Liste des threads */ - pthread_t *threads; - - /* Dernier élément du deque (premier ajouter) */ - int *top; + /* Liste de workers par threads */ + struct worker *workers; }; /* Lance une tâche de la pile */ @@ -46,18 +56,16 @@ void *sched_worker(void *); /* Nettoie les opérations effectuées par l'initialisation de l'ordonnanceur */ int sched_init_cleanup(struct scheduler, int); -/* Récupère l'index du thread courant */ +/* Récupère l'index du thread courant + * + * Assume que le mutex de l'ordonnanceur est verrouillé */ int current_thread(struct scheduler *); int sched_init(int nthreads, int qlen, taskfunc f, void *closure) { static struct scheduler sched; - - sched.bottom = NULL; - sched.tasks = NULL; - sched.threads = NULL; - sched.top = NULL; + sched.workers = NULL; if(qlen <= 0) { fprintf(stderr, "qlen must be greater than 0\n"); @@ -73,7 +81,11 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) } sched.nthreads = 0; - sched.nthsleep = 0; + // Initialisation variable de condition + if(pthread_cond_init(&sched.cond, NULL) != 0) { + fprintf(stderr, "Can't init condition variable\n"); + return sched_init_cleanup(sched, -1); + } // Initialisation du mutex if(pthread_mutex_init(&sched.mutex, NULL) != 0) { @@ -81,34 +93,28 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) return sched_init_cleanup(sched, -1); } - // Initialisation variable de condition - if(pthread_cond_init(&sched.cond, NULL) != 0) { - fprintf(stderr, "Can't init varcond\n"); - return sched_init_cleanup(sched, -1); - } + sched.nthsleep = 0; - // Initialisation du curseur suivant l'état de la pile de chaque processus - if(!(sched.bottom = malloc(nthreads * sizeof(int)))) { - perror("Cursor bottom stack"); - return sched_init_cleanup(sched, -1); - } - if(!(sched.top = malloc(nthreads * sizeof(int)))) { - perror("Cursor top stack"); - return sched_init_cleanup(sched, -1); + // Initialize workers + if(!(sched.workers = malloc(nthreads * sizeof(struct worker)))) { + perror("Workers"); + return -1; } for(int i = 0; i < nthreads; ++i) { - sched.bottom[i] = 0; - sched.top[i] = 0; - } + sched.workers[i].bottom = 0; + sched.workers[i].top = 0; - // Allocation mémoire pour la pile de chaque processus - if(!(sched.tasks = malloc(nthreads * sizeof(struct task_info *)))) { - perror("Deque list"); - return sched_init_cleanup(sched, -1); - } - for(int i = 0; i < nthreads; ++i) { - if(!(sched.tasks[i] = malloc(qlen * sizeof(struct task_info)))) { - fprintf(stderr, "Deque for thread %d: %s\n", i, strerror(errno)); + // Initialisation mutex + if(pthread_mutex_init(&sched.workers[i].mutex, NULL) != 0) { + fprintf(stderr, "Can't init mutex %d\n", i); + return sched_init_cleanup(sched, -1); + } + + // Allocation mémoire deque + if(!(sched.workers[i].tasks = + malloc(sched.qlen * sizeof(struct task_info)))) { + fprintf(stderr, "Thread %d: ", i); + perror("Deque list"); return sched_init_cleanup(sched, -1); } } @@ -116,45 +122,44 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) // Initialise l'aléatoire srand(time(NULL)); - // Créer les threads - if(!(sched.threads = malloc(nthreads * sizeof(pthread_t)))) { - perror("Threads"); - return sched_init_cleanup(sched, -1); - } - - // Ajoute la tâche initiale - if(sched_spawn(f, closure, &sched) < 0) { - fprintf(stderr, "Can't create the initial task\n"); - return sched_init_cleanup(sched, -1); - } - - // Démarre les threads + // Création des threads for(int i = 0; i < nthreads; ++i) { - if(pthread_create(&sched.threads[i], NULL, sched_worker, &sched) != 0) { - fprintf(stderr, "Can't create the thread %d\n", i); + pthread_mutex_lock(&sched.mutex); + if(pthread_create(&sched.workers[i].thread, NULL, sched_worker, + (void *)&sched) != 0) { + fprintf(stderr, "Can't create thread %d\n", i); - if(i > 0) { - fprintf(stderr, ", cancelling already created threads...\n"); - for(int j = 0; j < i; ++j) { - if(pthread_cancel(sched.threads[j]) != 0) { - fprintf(stderr, "Can't cancel the thread %d\n", j); - } - } - } else { - fprintf(stderr, "\n"); + // Annule les threads déjà créer + for(int j = 0; j < i; ++j) { + pthread_cancel(sched.workers[j].thread); } return sched_init_cleanup(sched, -1); } - pthread_mutex_lock(&sched.mutex); sched.nthreads++; + pthread_mutex_unlock(&sched.mutex); } + // Ajoute la tâche initiale + if(sched_spawn(f, closure, &sched) < 0) { + fprintf(stderr, "Can't queue the initial task\n"); + return sched_init_cleanup(sched, -1); + } + + // Attend la fin des threads for(int i = 0; i < nthreads; ++i) { - if((pthread_join(sched.threads[i], NULL) != 0)) { + if((pthread_join(sched.workers[i].thread, NULL) != 0)) { fprintf(stderr, "Can't wait the thread %d\n", i); + + // Quelque chose s'est mal passé, on annule les threads en cours + for(int j = 0; j < nthreads; ++j) { + if(j != i) { + pthread_cancel(sched.workers[j].thread); + } + } + return sched_init_cleanup(sched, -1); } } @@ -165,35 +170,20 @@ sched_init(int nthreads, int qlen, taskfunc f, void *closure) int sched_init_cleanup(struct scheduler s, int ret_code) { - pthread_mutex_destroy(&s.mutex); - pthread_cond_destroy(&s.cond); - if(s.tasks) { + pthread_mutex_destroy(&s.mutex); + + if(s.workers) { for(int i = 0; i < s.nthreads; ++i) { - if(s.tasks[i]) { - free(s.tasks[i]); - s.tasks[i] = NULL; - } + pthread_mutex_destroy(&s.workers[i].mutex); + + free(s.workers[i].tasks); + s.workers[i].tasks = NULL; } - free(s.tasks); - s.tasks = NULL; - } - - if(s.threads) { - free(s.threads); - s.threads = NULL; - } - - if(s.bottom) { - free(s.bottom); - s.bottom = NULL; - } - - if(s.top) { - free(s.top); - s.top = NULL; + free(s.workers); + s.workers = NULL; } return ret_code; @@ -204,14 +194,11 @@ current_thread(struct scheduler *s) { pthread_t current = pthread_self(); - pthread_mutex_lock(&s->mutex); for(int i = 0; i < s->nthreads; ++i) { - if(pthread_equal(s->threads[i], current)) { - pthread_mutex_unlock(&s->mutex); + if(pthread_equal(s->workers[i].thread, current)) { return i; } } - pthread_mutex_unlock(&s->mutex); return -1; } @@ -220,24 +207,28 @@ int sched_spawn(taskfunc f, void *closure, struct scheduler *s) { int th; + + pthread_mutex_lock(&s->mutex); if((th = current_thread(s)) < 0) { th = 0; } + pthread_mutex_unlock(&s->mutex); - pthread_mutex_lock(&s->mutex); + pthread_mutex_lock(&s->workers[th].mutex); - int next = (s->bottom[th] + 1) % s->qlen; - if(next == s->top[th]) { - pthread_mutex_unlock(&s->mutex); + int next = (s->workers[th].bottom + 1) % s->qlen; + if(next == s->workers[th].top) { + pthread_mutex_unlock(&s->workers[th].mutex); fprintf(stderr, "Stack is full\n"); errno = EAGAIN; return -1; } - s->tasks[th][s->bottom[th]] = (struct task_info){closure, f}; - s->bottom[th] = next; + s->workers[th].tasks[s->workers[th].bottom] = + (struct task_info){closure, f}; + s->workers[th].bottom = next; - pthread_mutex_unlock(&s->mutex); + pthread_mutex_unlock(&s->workers[th].mutex); return 0; } @@ -249,55 +240,69 @@ sched_worker(void *arg) // Récupère le processus courant (index tableau) int curr_th; + + pthread_mutex_lock(&s->mutex); while((curr_th = current_thread(s)) < 0); + pthread_mutex_unlock(&s->mutex); struct task_info task; int found; while(1) { found = 0; - pthread_mutex_lock(&s->mutex); + pthread_mutex_lock(&s->workers[curr_th].mutex); - if(s->top[curr_th] != s->bottom[curr_th]) { + if(s->workers[curr_th].top != s->workers[curr_th].bottom) { found = 1; - s->bottom[curr_th] = (s->bottom[curr_th] - 1 + s->qlen) % s->qlen; - task = s->tasks[curr_th][s->bottom[curr_th]]; + s->workers[curr_th].bottom = + (s->workers[curr_th].bottom - 1 + s->qlen) % s->qlen; + task = s->workers[curr_th].tasks[s->workers[curr_th].bottom]; } + pthread_mutex_unlock(&s->workers[curr_th].mutex); if(!found) { // Vol car aucune tâche trouvée - for(int i = 0, k = rand() % (s->nthreads + 1), target; - i < s->nthreads; ++i) { - target = (i + k) % s->nthreads; + pthread_mutex_lock(&s->mutex); + int nthreads = s->nthreads; + pthread_mutex_unlock(&s->mutex); - if(s->top[target] != s->bottom[target]) { + for(int i = 0, k = rand() % (nthreads + 1), target; i < nthreads; + ++i) { + target = (i + k) % nthreads; + + pthread_mutex_lock(&s->workers[target].mutex); + if(s->workers[target].top != s->workers[target].bottom) { // Tâche trouvée found = 1; - s->bottom[target] = - (s->bottom[target] - 1 + s->qlen) % s->qlen; - task = s->tasks[target][s->bottom[target]]; + s->workers[target].bottom = + (s->workers[target].bottom - 1 + s->qlen) % s->qlen; + task = s->workers[target].tasks[s->workers[target].bottom]; + + pthread_mutex_unlock(&s->workers[target].mutex); break; } + pthread_mutex_unlock(&s->workers[target].mutex); } // Aucune tâche à faire if(!found) { + pthread_mutex_lock(&s->mutex); s->nthsleep++; + // Ne partir que si tout le monde dort if(s->nthsleep >= s->nthreads) { pthread_cond_broadcast(&s->cond); pthread_mutex_unlock(&s->mutex); - break; } + pthread_cond_wait(&s->cond, &s->mutex); s->nthsleep--; + pthread_mutex_unlock(&s->mutex); continue; } } - - pthread_cond_broadcast(&s->cond); - pthread_mutex_unlock(&s->mutex); + pthread_cond_signal(&s->cond); // Exécute la tâche task.f(task.closure, s);