#include #include #include "thread_pool.h" #include #include "../defs.h" ///////////////////////////////// // Used by all data structures // ///////////////////////////////// static unsigned int length = 0; static pthread_mutex_t mutex; static pthread_cond_t isReady; /////////////////////// // Linked List Queue // /////////////////////// typedef struct QUEUE { int socket; struct QUEUE* next; } qMember; qMember* qHead = NULL; qMember* qTail = NULL; pthread_mutex_t* get_mutex() { return &mutex; } pthread_cond_t* get_is_ready() { return &isReady; } void qAdd(int socket) { qMember* newNode = malloc(sizeof(qMember)); VPRINTF(("Adding socket: %d\n", socket)); if (!newNode) { printf("Malloc failed, qAdd\n"); exit(1); } newNode->next = NULL; newNode->socket = socket; pthread_mutex_lock(get_mutex()); if (!qHead) { qTail = qHead = newNode; length++; } else { qTail->next = newNode; qTail = qTail->next; length++; } pthread_mutex_unlock(get_mutex()); pthread_cond_signal(get_is_ready()); } int qRemove() { int toRet; //Socket to return qMember* temp; pthread_mutex_lock(get_mutex()); //Lock mutex while (!qHead) { pthread_cond_wait( get_is_ready(), get_mutex() ); //No requests waiting } toRet = qHead->socket; //retrieve socket VPRINTF(("Removing socket: %d\n", toRet)); //Fix the list temp = qHead; qHead = qHead->next; length--; //Fine grain lock, release lock before freeing old node pthread_mutex_unlock(get_mutex()); free(temp); return toRet; } /* * Wrappers */ void addSocket(int socket) { #ifdef LLQ qAdd(socket); #else qAdd(socket); #endif } int removeSocket() { #ifdef LLQ return qRemove(); #else return qRemove(); #endif } unsigned int size() { return length; } void init_synch() { pthread_mutex_init(get_mutex(), NULL); pthread_cond_init(get_is_ready(),NULL); } void init_thread_pool(unsigned short *port_num, int maxThreads, void *(*producer)(void*), void *(*consumer)(void*), void *args) { int i; if (!port_num) { printf("NULL Port Number\n"); exit(1); } init_synch(); printf("** Initializing Thread Pool\n"); VPRINTF(("*** Making %d threads on port %d\n",maxThreads,port_num)); fflush(stdout); //initialize producer producerThread = malloc(sizeof(pthread_t)); if (!producerThread) { exit(1); } printf("*** Listening on Port: %d\n", *port_num); pthread_create(producerThread, NULL, producer, port_num); //Initialize consumers consumerList = malloc(sizeof(pthread_t*)*maxThreads); for (i=0; i < maxThreads; i++) { consumerList[i] = malloc(sizeof(pthread_t)); if (pthread_create(consumerList[i], NULL, consumer, args)) { printf("Error creating thread %d\n",i); } } }