C++

C++ : Producer / Consumer Using Threads, Mutexes and Condition Variables

#include<iostream>
#include<mutex>
#include<deque>
#include<thread>
#include<chrono>

std::deque<int> dq;
std::mutex mu;

void Producer () {
    int count = 10;
    while (count > 0) {
        std::unique_lock<std::mutex> locker(mu);
        dq.push_back(count);
        locker.unlock();
        count--;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

// Not a good implementation of Consumer, because the Consumer is busy waiting
// ( looping continuiously ) for the data from the Producer.
void Consumer () {
    int data = 0;
    while (data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        if (!dq.empty()) {
            data = dq.front();
            dq.pop_front();
            locker.unlock(); // Unlock after consumption.
            std::cout << "Consumed data from deque : " << data << std::endl;
        } else {
            locker.unlock(); // Unlock if no data was found.
            // One possible way to avoid busy wait it to take a nap for 10 ms.
            // But this is not really helpful as we don't know the best possible
            // duration for the nap.
            // std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
}

int main() {

    std::thread t1(Producer);
    std::thread t2(Consumer);

    t1.join();
    t2.join();
    return 0;
}

Output

$./a.out
Got value from deque : 10
Got value from deque : 9
Got value from deque : 8
Got value from deque : 7
Got value from deque : 6
Got value from deque : 5
Got value from deque : 4
Got value from deque : 3
Got value from deque : 2
Got value from deque : 1
#include<iostream>
#include<mutex>
#include<deque>
#include<thread>
#include<chrono>
#include<fstream>
#include<condition_variable>

std::deque<int> dq;
std::mutex mu;
std::condition_variable cond;

class Logger {
    public:
    std::mutex mutex_log;
    std::ofstream m_of;
    std::once_flag m_flag;

    void Log_Message (std::string str) {

        std::call_once(m_flag, [&](){ m_of.open("log_file.txt"); });

        std::lock_guard<std::mutex> lock(mutex_log);
        m_of << str << std::endl;
    }
};

void Producer (Logger& logger) {
    int count = 10;
    while (count > 0) {
        std::unique_lock<std::mutex> locker(mu);
        dq.push_back(count);
        locker.unlock();
        cond.notify_one(); // Notifies one waiting thread for the data
        logger.Log_Message("Producer : Notified the consumer.");
        count--;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

void Consumer (Logger& logger) {
    int data = 0;
    while (data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        // Consumer sleeps till notified by the producer.
        // Note : During sleep the mutex is unlocked and released.
        // After waking up, the muxtex is again locked. 
        logger.Log_Message("Consumer : Sleeping now");
        cond.wait(locker);
        logger.Log_Message("Consumer : Got notified. Now waking up.");
        
        data = dq.front();
        dq.pop_front();
        locker.unlock(); // Unlock after consumption.

        std::string str = std::to_string(data);
        std::string msg("Consumer : Data fetched from deque : ");
        msg.append(str);
        logger.Log_Message(msg);
    }
}

int main() {

    Logger logger;

    /* Note : If the consumer sleeps after the producer has notified (notify_one()),
    then the consumer has missed the first notification because it wasn't waiting for it.
    Thus it will not get to read the last number from the deque. 
    One way to address this is to have consumer waiting before the production has started. */
    
    std::thread consumer_thread(Consumer, std::ref(logger));
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
    std::thread producer_thread(Producer, std::ref(logger));

    producer_thread.join();
    consumer_thread.join();
    return 0;
}

Output

$ tail -f log_file.txt
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 10
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 9
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 8
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 7
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 6
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 5
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 4
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 3
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 2
Consumer : Sleeping now
Producer : Notified the consumer.
Consumer : Got notified. Now waking up.
Consumer : Data fetched from deque : 1


Copyright (c) 2019-2024, Algotree.org.
All rights reserved.