156 lines
3.2 KiB
C
156 lines
3.2 KiB
C
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
|
|
#include "thread_pool.h"
|
|
#include <pthread.h>
|
|
#include "../defs.h"
|
|
|
|
|
|
/////////////////////////////////
|
|
// Used by all data structures //
|
|
/////////////////////////////////
|
|
|
|
static unsigned int length = 0;
|
|
|
|
static pthread_mutex_t mutex;
|
|
static pthread_cond_t isReady;
|
|
|
|
///////////////////////
|
|
// Linked List Queue //
|
|
///////////////////////
|
|
|
|
typedef struct QUEUE {
|
|
int socket;
|
|
struct QUEUE* next;
|
|
} qMember;
|
|
|
|
qMember* qHead = NULL;
|
|
qMember* qTail = NULL;
|
|
|
|
pthread_mutex_t* get_mutex()
|
|
{
|
|
return &mutex;
|
|
}
|
|
|
|
pthread_cond_t* get_is_ready()
|
|
{
|
|
return &isReady;
|
|
}
|
|
|
|
void qAdd(int socket) {
|
|
qMember* newNode = malloc(sizeof(qMember));
|
|
|
|
VPRINTF(("Adding socket: %d\n", socket));
|
|
|
|
if (!newNode) { printf("Malloc failed, qAdd\n"); exit(1); }
|
|
|
|
newNode->next = NULL;
|
|
newNode->socket = socket;
|
|
|
|
pthread_mutex_lock(get_mutex());
|
|
|
|
if (!qHead) {
|
|
qTail = qHead = newNode;
|
|
length++;
|
|
} else {
|
|
qTail->next = newNode;
|
|
qTail = qTail->next;
|
|
length++;
|
|
}
|
|
|
|
pthread_mutex_unlock(get_mutex());
|
|
pthread_cond_signal(get_is_ready());
|
|
|
|
}
|
|
|
|
int qRemove() {
|
|
int toRet; //Socket to return
|
|
qMember* temp;
|
|
|
|
pthread_mutex_lock(get_mutex()); //Lock mutex
|
|
|
|
while (!qHead) {
|
|
pthread_cond_wait( get_is_ready(), get_mutex() ); //No requests waiting
|
|
}
|
|
|
|
toRet = qHead->socket; //retrieve socket
|
|
VPRINTF(("Removing socket: %d\n", toRet));
|
|
|
|
//Fix the list
|
|
temp = qHead;
|
|
qHead = qHead->next;
|
|
length--;
|
|
|
|
//Fine grain lock, release lock before freeing old node
|
|
pthread_mutex_unlock(get_mutex());
|
|
|
|
free(temp);
|
|
|
|
return toRet;
|
|
}
|
|
|
|
/*
|
|
* Wrappers
|
|
*/
|
|
void addSocket(int socket) {
|
|
#ifdef LLQ
|
|
qAdd(socket);
|
|
#else
|
|
qAdd(socket);
|
|
#endif
|
|
}
|
|
|
|
int removeSocket() {
|
|
#ifdef LLQ
|
|
return qRemove();
|
|
#else
|
|
return qRemove();
|
|
#endif
|
|
}
|
|
|
|
unsigned int size() {
|
|
return length;
|
|
}
|
|
|
|
void init_synch() {
|
|
pthread_mutex_init(get_mutex(), NULL);
|
|
pthread_cond_init(get_is_ready(),NULL);
|
|
}
|
|
|
|
void init_thread_pool(unsigned short *port_num, int maxThreads,
|
|
void *(*producer)(void*),
|
|
void *(*consumer)(void*),
|
|
void *args)
|
|
{
|
|
int i;
|
|
|
|
if (!port_num) { printf("NULL Port Number\n"); exit(1); }
|
|
|
|
init_synch();
|
|
|
|
printf("** Initializing Thread Pool\n");
|
|
VPRINTF(("*** Making %d threads on port %d\n",maxThreads,port_num));
|
|
fflush(stdout);
|
|
|
|
//initialize producer
|
|
producerThread = malloc(sizeof(pthread_t));
|
|
if (!producerThread) { exit(1); }
|
|
|
|
printf("*** Listening on Port: %d\n", *port_num);
|
|
|
|
pthread_create(producerThread, NULL, producer, port_num);
|
|
|
|
//Initialize consumers
|
|
consumerList = malloc(sizeof(pthread_t*)*maxThreads);
|
|
|
|
for (i=0; i < maxThreads; i++) {
|
|
consumerList[i] = malloc(sizeof(pthread_t));
|
|
|
|
if (pthread_create(consumerList[i], NULL, consumer, args)) {
|
|
printf("Error creating thread %d\n",i);
|
|
}
|
|
|
|
}
|
|
|
|
}
|