Files
2025-06-07 01:59:34 -04:00

156 lines
3.2 KiB
C

#include <stdlib.h>
#include <stdio.h>
#include "thread_pool.h"
#include <pthread.h>
#include "../defs.h"
/////////////////////////////////
// Used by all data structures //
/////////////////////////////////
static unsigned int length = 0;
static pthread_mutex_t mutex;
static pthread_cond_t isReady;
///////////////////////
// Linked List Queue //
///////////////////////
typedef struct QUEUE {
int socket;
struct QUEUE* next;
} qMember;
qMember* qHead = NULL;
qMember* qTail = NULL;
pthread_mutex_t* get_mutex()
{
return &mutex;
}
pthread_cond_t* get_is_ready()
{
return &isReady;
}
void qAdd(int socket) {
qMember* newNode = malloc(sizeof(qMember));
VPRINTF(("Adding socket: %d\n", socket));
if (!newNode) { printf("Malloc failed, qAdd\n"); exit(1); }
newNode->next = NULL;
newNode->socket = socket;
pthread_mutex_lock(get_mutex());
if (!qHead) {
qTail = qHead = newNode;
length++;
} else {
qTail->next = newNode;
qTail = qTail->next;
length++;
}
pthread_mutex_unlock(get_mutex());
pthread_cond_signal(get_is_ready());
}
int qRemove() {
int toRet; //Socket to return
qMember* temp;
pthread_mutex_lock(get_mutex()); //Lock mutex
while (!qHead) {
pthread_cond_wait( get_is_ready(), get_mutex() ); //No requests waiting
}
toRet = qHead->socket; //retrieve socket
VPRINTF(("Removing socket: %d\n", toRet));
//Fix the list
temp = qHead;
qHead = qHead->next;
length--;
//Fine grain lock, release lock before freeing old node
pthread_mutex_unlock(get_mutex());
free(temp);
return toRet;
}
/*
* Wrappers
*/
void addSocket(int socket) {
#ifdef LLQ
qAdd(socket);
#else
qAdd(socket);
#endif
}
int removeSocket() {
#ifdef LLQ
return qRemove();
#else
return qRemove();
#endif
}
unsigned int size() {
return length;
}
void init_synch() {
pthread_mutex_init(get_mutex(), NULL);
pthread_cond_init(get_is_ready(),NULL);
}
void init_thread_pool(unsigned short *port_num, int maxThreads,
void *(*producer)(void*),
void *(*consumer)(void*),
void *args)
{
int i;
if (!port_num) { printf("NULL Port Number\n"); exit(1); }
init_synch();
printf("** Initializing Thread Pool\n");
VPRINTF(("*** Making %d threads on port %d\n",maxThreads,port_num));
fflush(stdout);
//initialize producer
producerThread = malloc(sizeof(pthread_t));
if (!producerThread) { exit(1); }
printf("*** Listening on Port: %d\n", *port_num);
pthread_create(producerThread, NULL, producer, port_num);
//Initialize consumers
consumerList = malloc(sizeof(pthread_t*)*maxThreads);
for (i=0; i < maxThreads; i++) {
consumerList[i] = malloc(sizeof(pthread_t));
if (pthread_create(consumerList[i], NULL, consumer, args)) {
printf("Error creating thread %d\n",i);
}
}
}