Files
LanguageBasics/ProducerConsumer/Cpp/producerconsumer.cpp
2025-06-07 11:38:03 -04:00

204 lines
5.0 KiB
C++

#include <iostream>
#include <limits>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <thread>
#include <atomic>
#include <random>
#include <memory>
#ifdef __cpp_lib_semaphore
#include <semaphore>
using PCSemaphore = std::counting_semaphore<std::numeric_limits<std::ptrdiff_t>::max()>;
#else
// Simple semaphore since it's not a part of the C++ std
class semaphore final
{
private:
std::mutex m_mutex;
std::condition_variable m_cv;
int m_count;
public:
semaphore(const int& initValue = 0) : m_count(initValue) {};
void acquire() // Wait for a signal to unblock the thread
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_count == 0)
{
m_cv.wait(lock);
}
m_count--;
}
void release() // Notify waiters that data is available for access
{
std::unique_lock<std::mutex> lock(m_mutex);
m_count++;
m_cv.notify_one();
}
};
using PCSemaphore = semaphore;
#endif
/*
Simple producer consumer example.
N producers write ints into a shared queue (mConsumerData).
M consumers read these ints, one at a time, then "sleep" that time to pretend its processing
Once all the data is done, the threads join and exit. They know based on the global exit flag
Producers P P P P
write to --> mConsumerData -> Post Semaphore --> repeat
Consumers C C C C C C
wait on semaphore --> pop oldest mConsumerData --> repeat
The "data" is just an int for how long the consumer will "process" data.
Essentially, the producer sleeps for some time [min,max] then pushes an int [min,max]
into the queue to simulate producing data.
The consumers wait on that int then sleep for that time to simulate processing of data.
*/
class ProducerConsumer final
{
public:
struct InitParams
{
InitParams() {}
size_t ProduceCount = 100; // How much "data" to produce
size_t MinProduceDelayMs = 0; // Min time to produce "data"
size_t MaxProduceDelayMs = 100; // Max time to produce "data"
size_t MinConsumeDelayMs = 25; // Min time to consume "data"
size_t MaxConsumeDelayMs = 1000; // Max time to consume "data"
size_t ProducerThreads = 4; // Number of producer threads to start up
size_t ConsumerThreads = 10; // Number of consumer threads to start up
};
ProducerConsumer(const InitParams &params = InitParams());
void Start();
void WaitDone();
private:
void Producer();
void Consumer();
static const int sExitFlag;
InitParams mInitParams;
std::mutex mListLock;
PCSemaphore mSemaphore;
std::queue<int> mConsumerData;
std::vector<std::thread> mProducers;
std::vector<std::thread> mConsumers;
std::atomic_uint mProduced;
std::atomic_bool mShouldExit;
};
const int ProducerConsumer::sExitFlag = -1;
ProducerConsumer::ProducerConsumer(const InitParams &params)
: mInitParams(params)
, mSemaphore(0)
, mProduced(0)
, mShouldExit(false)
{
}
void ProducerConsumer::Start()
{
mShouldExit = false;
// Kick off all the consumer threads
for (size_t i = 0; i < mInitParams.ConsumerThreads; i++)
{
mConsumers.push_back(std::thread(&ProducerConsumer::Consumer, this));
}
// Kick off all the producer threads
for (size_t i = 0; i < mInitParams.ProducerThreads; i++)
{
mProducers.push_back(std::thread(&ProducerConsumer::Producer, this));
}
}
void ProducerConsumer::WaitDone()
{
// Wait for all the producers to finish producing and exit
for (auto &t : mProducers)
{ t.join(); }
// Push the exit data flag to all the consumer threads
mListLock.lock();
for (size_t i = 0; i < mInitParams.ConsumerThreads; i++)
{
mConsumerData.push(sExitFlag);
mSemaphore.release();
}
mListLock.unlock();
std::cout << "Waiting on consumers..." << std::endl;
for (auto &t : mConsumers) // Wait for all the consumers to finish and exit
{ t.join(); }
std::cout << "All consumers done. Exiting now." << std::endl;
}
void ProducerConsumer::Producer()
{
unsigned int oldVal;
std::random_device randDevice;
while (oldVal = mProduced.fetch_add(1), oldVal < mInitParams.ProduceCount)
{
// "Produce" data
std::cout << "Producing #" << oldVal << std::endl;
int delay = int((randDevice() % mInitParams.MaxProduceDelayMs) + mInitParams.MinProduceDelayMs);
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
// push the "data" onto the queue
mListLock.lock();
mConsumerData.push(int((randDevice() % mInitParams.MaxConsumeDelayMs) + mInitParams.MinConsumeDelayMs));
mListLock.unlock();
mSemaphore.release();
}
}
void ProducerConsumer::Consumer()
{
int nextData;
while (true)
{
mSemaphore.acquire();
// Pull the next bit of data from the queue
mListLock.lock();
nextData = mConsumerData.front();
mConsumerData.pop();
mListLock.unlock();
// When we pull the exit "data", we just need to break
if (nextData == sExitFlag) break;
// otherwise, "consume" the data
std::cout << "Consuming for " << nextData << "ms" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(nextData));
}
}
int main()
{
ProducerConsumer prod;
prod.Start();
prod.WaitDone();
return 0;
}