WaitableEvent vs. Semaphore (Queues)


#1

I guess I am having trouble with WaitableEvent in a producer/consumer synchronisation (a queue). I assumed it basically was a sempahore, but that doesn’t seem to be true.

A semaphore maintains a signal counter that is incremented when a thread (producer) signals it, and decremented after another thread (consumer) called wait() and succeeded. The consumer is only required to wait, if the signal count is zero. This is analog to the behavior of a queue: If it is empty, one must wait for something to be added to it. Otherwise one can pick the first due object and proceed.

Queues work great for multiple producers and multiple consumers (load balancing!) and are extremely lightweight and fast (no memory allocation, using a ring buffer). I wrote a Queue class and am making heavy use of it, but it seems my assumption of the behavior of WaitableEvent was wrong, because the synchronisation is getting “out of sync” occasionally.

Would it make sense to implement a Semaphore class? Or am I just missing something?

Andre


#2

I have a Queue code implemented using 2 waitable event (one for pop, one for push). If you’re interested, PM me, I’ll send it to you.


#3

Thanks for the offer, much appreciated. I would however rather stay with the current Queue code I already have, let alone for the load balancing and many-in-to-many-out capability, and fix the Semaphore instead.

Here’s a Semaphore class that works fine under a stress tests with many consumers and producers:

[code]
class Semaphore
{
public:

Semaphore ()
    : lock (), 
      sync (), 
      signals (0)
{};

~Semaphore ()
{};

bool wait (int timeOutMilliseconds = -1)
{
    {
        const ScopedLock sl (lock);            
        if (signals > 0)
        {
            signals--;
            return true;
        }
    }
    if (sync.wait (timeOutMilliseconds))
    {
        const ScopedLock sl (lock);            
        signals--;
        return true;
    }
    return false;
}

void signal ()
{
    {
        const ScopedLock sl (lock);
        signals++;
        if (signals == 1)
            sync.signal();
    }
}

void reset ()
{
    const ScopedLock sl (lock);
    signals = 0;
    sync.reset();
}

private:

CriticalSection  lock;
WaitableEvent  sync;
int  signals;

};[/code]


#4

That could probably be made a lot faster by dropping the CriticalSection and using an Atomic.


#5

And worst, it’s racy:

      if (sync.wait (timeOutMilliseconds))
        {   <==== // What should happen if the thread is scheduled here, and another thread perform a Semaphore.wait() by that time ?
            const ScopedLock sl (lock);           

#6

Hmm. In the Queue class, I have the read function do another check whether the queue is empty after it acquired the queue lock. This happens often when many consumers fight for the queued objects in parallel and one consumer stole away the last object right before the next consumer acquires the lock. Thanks to the single lock though, this method is safe.

That’s quite different with Semaphore, as you correctly pointed out. There are two locking mechanisms chained in sequence, leaving a very short period without protection. The bad thing is you can not nest them here, without cause a deadlock.

Hence the only way to implement Semaphore is probably to copy WaitableEvent and enhance it with the signal counter, so all operation is safely covered by a single lock.


#7

BTW: The reason I love the semaphore so much is the elegance it adds to a queue model:

producers:
	lock ( add -> signal )

consumers:
	wait -> lock ( (extra empty check) -> remove )

Together with the ring buffer, which basically only consists of two integers, you get an extremely fast and leightweight queue.


#8

Okay, I don’t know the exact details but I have to really wonder - have you carefully analyzed the performance of your system? Because load balancing would only matter if your threads are saturated / running at peak load. If you’re having problems with waking threads up it tells me that you are not operating at peak load. In the saturated state, there will be very little waiting .

Just my opinion but if you think you’re going to have a magic synchronization primitive like a semaphore or waitable event automatically give you load balancing and all sorts of nice properties, think again. Typically the way that load balancing is implemented is by giving each thread its own MPSC queue (multiple producer, single consumer) and having a bit of intelligence in the ‘enqueue’ operation. To add work, the a random or round-robined thread is selected and the work item enqueued. Since the MPSC queue is pretty straightforward, the implementation of signaling is quite easy. In fact the juce::Thread::wait() and juce::Thread::notify() are perfectly suited for this.

Because one thread may complete a lot sooner than another, after a thread empties out its own queue, it uses “work-stealing” to grab some work from another thread. This process continues in parallel for all threads until there is no work remaining either in the thread’s queue or available to be stolen. Then the thread goes back to sleep.

Some information on work-stealing:

Hmm…global ring buffer…this is going to be extremely unfriendly to the CPU cache in a multicore system, it creates a bottleneck for both enqueue and dequeue…this is not fast.

From Dmitry:

The “Scalable Synchronization Algorithms” Google group has some very cool ideas regarding MPMC queues implemented using ring buffers:

http://groups.google.com/group/lock-free/browse_thread/thread/f2745e883a590184

Using a boost::thread_specific_ptr would probably be very helpful too.


#9

If all you need is a semaphore, wrap your own for each platform (sem_post / sem_wait on posix, CreateSemaphore on windows). However, as Vincent pointed out, I doubt semaphore are the issue in your case.
You should probably profile your application first, and figure out the real culprit.


#10

You could also just use a manual-reset event since you’re taking a critical section for both the enqueue and the dequeue. Signal the event when the queue transitions to non-empty, and reset it when the queue becomes empty.

This means that your threads will burn some unnecessary CPU for the case when the queue is empty but it will be a baseline implementation that is guaranteed to work.


#11

@Jules: I tried with Atomic but that lead to corruption in the stress test. I think there is no way around a single lock that protects the entire semaphore as a whole.

@TheVinn: Well, “load balancing” might not be the appropriate term for what I want to do. I’d rather name it “job distribution”. Many producers have jobs to be done, and many workers fetch them from the queue when they are ready for the next job. This way all workes always have something to do, unless the queue size gets below the number of workers.

Thanks for the pointers and links. I see the multicore argument, but IMO a mere thousand or so object pointers in a queue do not make a huge difference. The actual objects need to be passed around anyway, so it doesnt’t matter when exactly this happens.


#12

MPMC bounded queue can be implemented entirely without a mutex

Juce implements CAS with juce::Atomic::compareAndSwapBool()


#13

If you absolutely want to correct your code, it should read:

    class Semaphore
    {
    public:

        Semaphore ()
            : lock (),
              sync (),
              signals (0)
        {};
       
       ~Semaphore ()
        {};
       
        bool wait (int timeOutMilliseconds = -1)
        {
            {
                const ScopedLock sl (lock);           
                if (signals > 0)
                {
                    signals--;
                    return true;
                }
            }
            while (sync.wait (timeOutMilliseconds))
            {
                const ScopedLock sl (lock);           
                if (signals <= 0) continue; 
                signals--;
                return true;
            }
            return false;
        }
       
        void signal ()
        {
            {
                const ScopedLock sl (lock);
                signals++;
                if (signals == 1)
                    sync.signal();
            }
        }
       
        void reset ()
        {
            const ScopedLock sl (lock);
            signals = 0;
            sync.reset();
        }
       
    private:
       
        CriticalSection  lock;
        WaitableEvent  sync;
        int  signals;
       
    };

But beware that you might overcome the expected timeout if multiple thread are fighting for waiting.


#14

Shouldn’t the WaitableEvent be manual reset? i.e.

    class Semaphore
    {
    public:
        Semaphore ()
            : lock (),
              sync (true), /* MANUAL RESET */
              signals (0)
        {};

Huge bug right there and makes me wonder about the validity of any performance analysis or claims of “load balancing”…


#15

No, wait() does the reset automatically on success. Or did I miss something important?


#16

No, wait() does the reset automatically on success. Or did I miss something important?[/quote]

Yeah none of this looks right. The waitable event should be signaled when signals > 0 and non-signaled when signals == 0. And it should be manual-reset. The code that X-Ryl submitted looks incorrect as well, since it doesn’t reset the event only when signals drops to 0.

If the event is not manual-reset, then only one thread will ever wake up when it becomes signaled.

Let me double check that…the Juce documentation indicates otherwise


#17

Yeah the Juce documentation is a little misleading. It says that the result of a successful wait() on an automatic-reset WaitableEvent will release all waiting threads, but this is not how a Windows waitable event works.

From CreateEvent:

[quote]BOOL bManualReset
If this parameter is TRUE, the function creates a manual-reset event object, which requires the use of the ResetEvent function to set the event state to nonsignaled. If this parameter is FALSE, the function creates an auto-reset event object, and system automatically resets the event state to nonsignaled after a single waiting thread has been released. [/quote]

“single waiting thread”

I don’t know how Mac WaitableEvent / pthreads is implemented but Juce might exhibit different behavior on different platforms especially if Jules thought wait() would release all threads.


#18

Ouch.

What really bugs me is that “signaled state” obviously is a boolean. That is entirely useless if you have multiple threads waiting. A semaphore would have a signal count, that is, if there are 5 threads waiting and 3 signals get posted, only 3 threads wake up and the other 2 would still be waiting for more signals to come. That’s how I expect it to work and everyting else looks half-baken and shaky to me.

Basically a semaphore itself is a queue of waiting, suspended threads. And that’s exactly the way it is implemented in other languages.


#19

[quote=“ans”]What really bugs me is that “signaled state” obviously is a boolean. That is entirely useless if you have multiple threads waiting. A semaphore would have a signal count, that is, if there are 5 threads waiting and 3 signals get posted, only 3 threads wake up and the other 2 would still be waiting for more signals to come. That’s how I expect it to work and everyting else looks half-baken and shaky to me.

Basically a semaphore itself is a queue of waiting, suspended threads.[/quote]

Yes, and the correct implementation of a semaphore requires access to some kernel structures which is why a user-mode implementation of semaphores will be imperfect at best (and buggy at worst).

Just abandon the semaphore and implement your MPMC signaling using a manual-reset event, it will work fine and you won’t notice the little bit of spinning when the queue transitions to empty.


#20

Did so now and it works. Thanks for the great discussion.