#include #include #include #include #include #include #include #include #include #include #ifdef __cpp_lib_semaphore #include using PCSemaphore = std::counting_semaphore::max()>; #else // Simple semaphore since it's not a part of the C++ std class semaphore final { private: std::mutex m_mutex; std::condition_variable m_cv; int m_count; public: semaphore(const int& initValue = 0) : m_count(initValue) {}; void acquire() // Wait for a signal to unblock the thread { std::unique_lock lock(m_mutex); while (m_count == 0) { m_cv.wait(lock); } m_count--; } void release() // Notify waiters that data is available for access { std::unique_lock lock(m_mutex); m_count++; m_cv.notify_one(); } }; using PCSemaphore = semaphore; #endif /* Simple producer consumer example. N producers write ints into a shared queue (mConsumerData). M consumers read these ints, one at a time, then "sleep" that time to pretend its processing Once all the data is done, the threads join and exit. They know based on the global exit flag Producers P P P P write to --> mConsumerData -> Post Semaphore --> repeat Consumers C C C C C C wait on semaphore --> pop oldest mConsumerData --> repeat The "data" is just an int for how long the consumer will "process" data. Essentially, the producer sleeps for some time [min,max] then pushes an int [min,max] into the queue to simulate producing data. The consumers wait on that int then sleep for that time to simulate processing of data. */ class ProducerConsumer final { public: struct InitParams { InitParams() {} size_t ProduceCount = 100; // How much "data" to produce size_t MinProduceDelayMs = 0; // Min time to produce "data" size_t MaxProduceDelayMs = 100; // Max time to produce "data" size_t MinConsumeDelayMs = 25; // Min time to consume "data" size_t MaxConsumeDelayMs = 1000; // Max time to consume "data" size_t ProducerThreads = 4; // Number of producer threads to start up size_t ConsumerThreads = 10; // Number of consumer threads to start up }; ProducerConsumer(const InitParams ¶ms = InitParams()); void Start(); void WaitDone(); private: void Producer(); void Consumer(); static const int sExitFlag; InitParams mInitParams; std::mutex mListLock; PCSemaphore mSemaphore; std::queue mConsumerData; std::vector mProducers; std::vector mConsumers; std::atomic_uint mProduced; std::atomic_bool mShouldExit; }; const int ProducerConsumer::sExitFlag = -1; ProducerConsumer::ProducerConsumer(const InitParams ¶ms) : mInitParams(params) , mSemaphore(0) , mProduced(0) , mShouldExit(false) { } void ProducerConsumer::Start() { mShouldExit = false; // Kick off all the consumer threads for (size_t i = 0; i < mInitParams.ConsumerThreads; i++) { mConsumers.push_back(std::thread(&ProducerConsumer::Consumer, this)); } // Kick off all the producer threads for (size_t i = 0; i < mInitParams.ProducerThreads; i++) { mProducers.push_back(std::thread(&ProducerConsumer::Producer, this)); } } void ProducerConsumer::WaitDone() { // Wait for all the producers to finish producing and exit for (auto &t : mProducers) { t.join(); } // Push the exit data flag to all the consumer threads mListLock.lock(); for (size_t i = 0; i < mInitParams.ConsumerThreads; i++) { mConsumerData.push(sExitFlag); mSemaphore.release(); } mListLock.unlock(); std::cout << "Waiting on consumers..." << std::endl; for (auto &t : mConsumers) // Wait for all the consumers to finish and exit { t.join(); } std::cout << "All consumers done. Exiting now." << std::endl; } void ProducerConsumer::Producer() { unsigned int oldVal; std::random_device randDevice; while (oldVal = mProduced.fetch_add(1), oldVal < mInitParams.ProduceCount) { // "Produce" data std::cout << "Producing #" << oldVal << std::endl; int delay = int((randDevice() % mInitParams.MaxProduceDelayMs) + mInitParams.MinProduceDelayMs); std::this_thread::sleep_for(std::chrono::milliseconds(delay)); // push the "data" onto the queue mListLock.lock(); mConsumerData.push(int((randDevice() % mInitParams.MaxConsumeDelayMs) + mInitParams.MinConsumeDelayMs)); mListLock.unlock(); mSemaphore.release(); } } void ProducerConsumer::Consumer() { int nextData; while (true) { mSemaphore.acquire(); // Pull the next bit of data from the queue mListLock.lock(); nextData = mConsumerData.front(); mConsumerData.pop(); mListLock.unlock(); // When we pull the exit "data", we just need to break if (nextData == sExitFlag) break; // otherwise, "consume" the data std::cout << "Consuming for " << nextData << "ms" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(nextData)); } } int main() { ProducerConsumer prod; prod.Start(); prod.WaitDone(); return 0; }