This problem originates from a job interview which could be solved in C++ or C#. As far as I can remember it was to implement a queue that can be added to by multiple threads and accessed by multiple threads. I haven't tried it in C# yet although I think it may be easier as there is a sychronized queue already implemented?

Anyway the code is lengthy but I would appreciate comments from those more experienced in Multi-Threaded programming.

#include <iostream>
#include <boost/thread.hpp>
#include <deque>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>

using namespace std;

//Multi-threaded queue
class multiQ
{
	boost::mutex m_mutex;

	deque<int> m_queue;
	boost::condition_variable m_cond;
public:
	void add(int i)
	{
			boost::lock_guard<boost::mutex> lock(m_mutex);
			m_queue.push_back(i);

			m_cond.notify_one();
	}

	int readRemove()
	{
		boost::unique_lock<boost::mutex> lock(m_mutex);
		if (!m_queue.size())
		{
			m_cond.wait(lock);
		}

		int returnVal = m_queue.front();

		m_queue.pop_front();
		return returnVal;
	}
};

class Qwriter
{
	multiQ* mq;
	void write( int ni){mq->add(ni);};
public:
	Qwriter(multiQ* nq):mq(nq){};

	void do_write()
	{
		boost::posix_time::milliseconds tTime(300);

		for (int i = 0; i < 100; i++)
		{
			boost::this_thread::sleep(tTime);
			write(i);
		}
	}
};
class Qreader
{
	multiQ* mq;
	void readRem()
	{
		int i = 0;
		i = mq->readRemove();
		cout << i << " " ; 
	};
public:
	Qreader(multiQ* nq):mq(nq){};

	void do_read()
	{
		for(int i=0 ; i < 100; i++)
			readRem();
	}
};

int main()
{
	multiQ mq1;

	Qreader* qRead1 = new Qreader(&mq1);
	Qwriter* qWrite1 = new Qwriter(&mq1);
	Qwriter* qWrite2 = new Qwriter(&mq1);

	boost::thread thread1(&Qwriter::do_write, qWrite1);
	boost::thread thread3(&Qwriter::do_write, qWrite2);
	boost::thread thread2(&Qreader::do_read, qRead1);

	thread1.join();
	thread2.join();
	system("PAUSE");

	return 0;
}

> Anyway the code is lengthy but I would appreciate comments from those more experienced in Multi-Threaded programming.
You should probably only call notify_one() if the previous state of the queue is empty. That's the point of using the condition variable, right? To avoid popping from an empty deque? ;)

void add(int i)
{
    boost::mutex::scoped_lock lock(m_mutex);
    bool first_item = m_deque.empty();

    m_queue.push(data);

    if (first_item)
        m_cond.notify_one();
}

Edward would like to hear your reasons behind the design of readRemove(), because there's some subtle stuff going on that Ed usually doesn't see from people claiming to be inexperienced in multithreaded programming. ;)

> I haven't tried it in C# yet although I think it may be easier as there is a sychronized queue already implemented?
The old containers library in C# has a Queue.Synchronized() method, but it's really no better than basic locks, which C# supports natively. Ed recommends keeping it simple with the lock keyword.

commented: Knows stuff! +1

You are correct about the condition! Supposing the queue is not empty before i add the data and I notify_one(), there are no threads waiting in this case. What is the consequence? Is it a potential error? Inefficient? Or just unclear to another programmer?

I wrote it a few months ago but the design seems to be...

Read_remove first creates a unique_lock object which should mean the whole method can only be accessed by 1 thread at a time (once lock() is called). I originally wanted to use scoped_lock but this seems to be old boost not favoured in the newer version. I think lock_guard may have done just as well and I can't remember if I tried it or not.

Then if there is nothing to remove from the queue the thread has to wait (for the notify_one statement) until something is added to the queue (which happens in add() function).

Then simply copy the first value in the queue and then remove it and then return the copy as the value read from the queue. The unique_lock should unlock as it goes out of scope and gets destroyed.

Have i missed/forgotton subtle points in my own program?

> What is the consequence?
Just lack of clarity. :) Edward needed a minute to verify that the condition was only used for a wait on empty, and when working with threaded code it is critically important to understand where locks and waits are because synchronization points are where problems pop up. It also couldn't hurt to minimize the number of notifications when they are not necessary. Less work and all that. ;)

> Have i missed/forgotton subtle points in my own program?
Actually, you nailed a subtle point by having readRemove() pop the item and return it. Good exception-safe design usually wants those two operations to be split, like std::queue<>'s front() and pop(), but in the presence of threads that can cause deadlock between the front() and pop() call. It *is* possible to avoid the risk of deadlock with that kind of one-two punch design, but just lumping it together like you did is simpler. Props. :)

I think I understand what you are saying. A queue which can be read independently from popping is more complicated to code for multiple threads (but more versatile). I might try to code that one. I have yet to experience deadlock.

Why is it more exception safe to read and pop separately? I can't think what the exceptions would be? I would be interested to know where you would put try and catch blocks in this code (or some part of it).

> I have yet to experience deadlock.
Apologies for the confusion. Ed wrote "deadlock" while thinking "race condition". Too much coffee, or not enough? ;)

> Why is it more exception safe to read and pop separately? I can't think what the exceptions would be?
Here is readRemove() with the next logical step of generalizing the contained type.

template <typename T>
T multiQ<T>::readRemove()
{
    boost::unique_lock<boost::mutex> lock(m_mutex);

    if (!m_queue.size())
        m_cond.wait(lock);

    // 1) T's copy constructor might throw
    T returnVal = m_queue.front();

    // 2) Never throws
    m_queue.pop_front();

    // 3) T's copy constructor might throw
    return returnVal;
}

The problem is the line marked 3. If the copy constructor throws an exception, the stack unwinds without passing returnVal back to the caller. But the queue's state has already changed, so the value is lost.

> I would be interested to know where you would put try and catch blocks in this code (or some part of it).
There is no need for try/catch unless the function can safely recover. Pushing a value that was popped is not guaranteed to be a nothrow operation, so it is unsafe. The right way to go about fixing the problem is to avoid it in the first place instead of trying to clean up the mess after it happens.

void multiQ<T>::readRemove(T& returnVal)
{
    boost::unique_lock<boost::mutex> lock(m_mutex);

    if (!m_queue.size())
        m_cond.wait(lock);

    // 1) T's copy constructor might throw
    returnVal = m_queue.front();

    // 2) Never throws
    m_queue.pop_front();
}

Everything after copying the value is a nothrow operation, so the side effect is guaranteed to be one of two things.

  • The copy throws, no side effect.
  • The copy succeeds, side effect occurs.

There is no execution path where the value is lost, so the method is now exception safe. Unfortunately, it is also not as convenient to the caller as returning a value because now the caller has to define an object and pass it in by reference.

Splitting the pop() and front() operations offers the best of both worlds: exception safety and convenient calling syntax. :)

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.