But that code will work perfectly well with negligible overhead. When multiple threads wait on a WaitableEvent, they all get woken up when it fires, and one of them will be first to grab the next job from the queue. Using a semaphore would be conceptually slightly neater, but would make no practical difference to anyone using the class.
It turns out that I dont need a semaphore, because…
ThreadPool is not exactly the right structure I’m looking for. I need a pool of threads, yes, but when I execute a “Job” I actually need the functor to run on all the threads at the same time, not just one.
I cobbled together this Semaphore class with some help from #algorithms (Freenode). Not sure if it works though (but it looks right)
Semaphore.h
/**
@ingroup vf_core
@brief A semaphore.
This provides a traditional semaphore synchronization primitive. There is no
upper limit on the number of signals.
@note There is no tryWait() or timeout facility for acquiring a resource.
*/
class Semaphore
{
public:
/** Create a semaphore with the specified number of resources.
@param initialCount The starting number of resources.
*/
explicit Semaphore (int initialCount);
~Semaphore ();
/** Increase the number of available resources.
@param amount The number of new resources available.
*/
void signal (int amount = 1);
/** Wait for a resource.
*/
void wait ();
private:
class WaitingThread
: public LockFreeStack <WaitingThread>::Node
, LeakChecked <WaitingThread>
{
public:
WaitingThread ();
WaitableEvent m_event;
};
typedef SpinLock LockType;
LockType m_mutex;
Atomic <int> m_counter;
LockFreeStack <WaitingThread> m_waitingThreads;
LockFreeStack <WaitingThread> m_deleteList;
};
Semaphore.cpp
Semaphore::WaitingThread::WaitingThread ()
: m_event (false) // auto-reset
{
}
//==============================================================================
Semaphore::Semaphore (int initialCount)
: m_counter (initialCount)
{
}
Semaphore::~Semaphore ()
{
// Can't delete the semaphore while threads are waiting on it!!
jassert (m_waitingThreads.pop_front () == nullptr);
for (;;)
{
WaitingThread* waitingThread = m_deleteList.pop_front ();
if (waitingThread != nullptr)
delete waitingThread;
else
break;
}
}
void Semaphore::signal (int amount)
{
jassert (amount > 0);
while (amount--)
{
// Make counter and list operations atomic.
LockType::ScopedLockType lock (m_mutex);
if (++m_counter <= 0)
{
WaitingThread* waitingThread = m_waitingThreads.pop_front ();
jassert (waitingThread != nullptr);
waitingThread->m_event.signal ();
}
}
}
void Semaphore::wait ()
{
// Always prepare the WaitingThread object first, either
// from the delete list or through a new allocation.
//
WaitingThread* waitingThread = m_deleteList.pop_front ();
if (waitingThread == nullptr)
waitingThread = new WaitingThread;
{
// Make counter and list operations atomic.
LockType::ScopedLockType lock (m_mutex);
if (--m_counter >= 0)
{
// Acquired the resource so put waitingThread back.
m_deleteList.push_front (waitingThread);
waitingThread = nullptr;
}
else
{
// Out of resources, go on to the waiting list.
m_waitingThreads.push_front (waitingThread);
}
}
// Do we need to wait?
if (waitingThread != nullptr)
{
// Yes so do it.
waitingThread->m_event.wait ();
// If the wait is satisfied, then we've been taken off the
// waiting list so put waitingThread back in the delete list.
//
m_deleteList.push_front (waitingThread);
}
}
To make this Semaphore implementation AudioIOCallback-friendly, it needs to keep the WaitingThread objects on a list so it can recycle them instead of allocating/deleting all the time.
As was to be expected when writing concurrent code, the Semaphore implementation I posted had numerous subtle bugs. After wasting the better part of a day I think I got all the kinks worked out - I updated the posted code (and its in VFLib too).