Thread safe reference counted message passing CODE!

Based on a discussion in another thread (http://rawmaterialsoftware.com/viewtopic.php?f=2&t=7645) I have consolidated an example of concurrent programming (threads) uses a thread queue (FIFO) of functors combined with a juce Thread to achieve asynchronous function calls with variable sized parameter lists. These asynchronous function call parameters may be type or object, including reference counted objects. Since the implementation is completely type safe and preserves all invariants of objects passed through the system, the usage of reference counted objects requires no additional thought or oversight on the part of the programmer beyond the usage requirements already imposed by Juce.

My approach to thread queues, which I believe is the best one for our end-user audio related purposes, is to think about things in terms of making function calls which are executed asynchronously on other threads. I prefer not to think about what type of queue it is, what the features of the queue are, or how the queue is implemented (except for when I have to actually write the queue).

At the higher level, in the design stage, I want a building block that abstracts away all of the concerns about object lifetime management, memory allocation, and all of that. The reason being, that getting concurrent apps working is HARD. Having a simple model, makes analysis and reasoning about the correctness of concurrent algorithms much easier. I will provide a simple example:

In this example, the audio i/o callback function renders a block of audio to pass it back to the Juce audio device manager so it can be played through the sound device, but it also makes a copy of the audio block and sends it to another thread which is responsible for recording the live audio output to disk (or it could be a shoutcast radio station)

The code example assumes knowledge of the audio specific portions of the Juce library. Member functions required by Juce, but not relevant to this example, have been intentionally omitted for brevity. Because I use multiple inheritance, I have prefixed member function calls with the relevant class using the scope resolution operator. This is not necessary to compile, but included for clarity. I also left out access control specifiers, everything is public.

The ThreadQueue object used in the example is provided by DspFilters:
http://code.google.com/p/dspfilterscpp/source/browse/trunk/demo/ThreadQueue.h
http://code.google.com/p/dspfilterscpp/source/browse/trunk/demo/ThreadQueue.cpp

First we need to create a combination juce::Thread, and ThreadQueue. Here it is:

#include "juce.h"
#include "ThreadQueue.h" // from DspFilters
#include <boost/bind.hpp> // or can use std::bind or std::tr1::bind if available

// Thread Queue that works with a Juce thread's event system
struct JuceThreadQueue : ThreadQueue, Thread
{
  JuceThreadQueue (const String& threadName)
    : Thread (threadName)
    , m_stop (false)
  {
    ThreadQueue::open ();

    Thread::startThread ();
  }

  ~JuceThreadQueue ()
  {
    ThreadQueue::call (boost::bind (&JuceThreadQueue::doStop, this));

    ThreadQueue::close ();

    Thread::waitForThreadToExit (-1);
  }

  void signal () // overrides ThreadQueue::signal
  {
    Thread::notify ();
  }

  void doStop ()
  {
    m_stop = true;
  }

  void run ()
  {
    while (!m_stop)
    {
      Thread::wait (-1);

      ThreadQueue::process ();
    }
  }

  bool m_stop;
};

Now we need a reference counted AudioSampleBuffer:

// Reference counted AudioSampleBuffer
struct SharedAudioBuffer : AudioSampleBuffer,
                           ReferenceCountedObject
{
  // This is for convenience
  typedef ReferenceCountedObjectPtr <SharedAudioBuffer> Ptr;

  // Because current C++ doesn't have perfect constructor forwarding,
  // we must manually create these constructors ourselves. c++0x solves this.
  // I only put in the constructors needed for the example.
  SharedAudioBuffer (
    int numChannels,
    int numSamples)
    : AudioSampleBuffer (numChannels, numSamples)
  {
  }

  // This useful function is missing from AudioSampleBuffer so I
  // put it here. Jules should add it! It assumes outputChannelData
  // has the correct number of channels.
  void copyTo (float** outputChannelData)
  {
    for (int i = 0; i < getNumChannels(); ++i)
      memcpy (outputChannelData [i],
              getArrayOfChannels ()[i],
              getNumSamples() * sizeof (outputChannelData[0][0]));
  }
};

Here is the Recorder object. It uses the JuceThreadQueue as a thread to receive function calls with reference counted buffers of audio data. When the Recorder is deleted, everything is shut down in an orderly fashion including the thread. Anything that was put into the queue is guaranteed to be executed, even if the recorder object is deleted when the queue is not empty.

// Records audio blocks to a file or network stream, using another thread
struct Recorder
{
  Recorder () : m_queue ("Recorder")
  {
  }

  // called from the audio i/o callback thread
  void add (SharedAudioBuffer::Ptr buffer)
  {
    m_queue.call (boost::bind (&Recorder::doAdd, this, buffer));
  }

  // called on our own thread NOT the audio i/o callback thread
  void doAdd (SharedAudioBuffer::Ptr buffer)
  {
    // Here, application does what it wants with buffer
    // For example, append it to a file or network stream.

    // The reference counting will take care of disposing buffer
    // when it is no longer used - no additional code is needed for it.
  }

  JuceThreadQueue m_queue;
};

To put everything together, here is the Mixer object, an audio i/o callback which renders the sound and passes it back to Juce. It contains a “Recorder” object, where it sends the reference counted buffer to another thread to do stuff. When the Mixer object is destroyed, the Recorder is automatically cleaned up. All reference counted buffers and asynchronous calls are guaranteed to be treated properly, and the thread is guaranteed to be shut down in an orderly way.

// Produces output to the audio device
struct Mixer : AudioIODeviceCallback
{
  // Renders the next block of audio and returns it as a reference counted buffer
  SharedAudioBuffer::Ptr renderNextAudioBlock (
    const float** inputChannelData,
    int numInputChannels,
    float** outputChannelData,
    int numOutputChannels,
    int numSamples)
  {
    SharedAudioBuffer::Ptr buffer (
      new SharedAudioBuffer (numOutputChannels, numSamples));

    // Here, *buffer is written to with the application-specific generated audio

    return buffer;
  }

  void audioDeviceIOCallback (
    const float** inputChannelData,
    int numInputChannels,
    float** outputChannelData,
    int numOutputChannels,
    int numSamples)
  {
    SharedAudioBuffer::Ptr buffer = renderNextAudioBlock (
      inputChannelData,
      numInputChannels,
      outputChannelData,
      numOutputChannels,
      numSamples);

    // Copy to device output
    buffer->copyTo (outputChannelData);

    // Give the buffer to the recording thread
    m_recorder.add (buffer);
  }

  Recorder m_recorder;
};

This code compiles and SHOULD work, perhaps after fixing a minor bug or two.
I hope this stimulates your imagination and expands your programming and code design horizons.
If you have questions or criticisms, blast away.

thanks, this is a nice demonstration for boost::bind / and RefCounted Objects.
From a practical point of view, we do all this things to avoid that the audio-callback is not running to long, so what comes to my mind, is it ok the allocate blocks of memory in a audio-callback these days?, maybe a classic FIFO (filled directy with the Audiodata) is the better solution for this specific problem (but has other cons, like overflowing, i know).
But anyway i think there are lot applications for those ThreadQueues, especially if you want to call lots different functions on a different thread.

You could give SharedAudioBuffer a custom operator new/delete that pre-allocates blocks on startup, and then hands out recycled blocks to avoid going to the system for memory. This wouldn’t change my example code.

Again, this is an example of thinking in terms of data and implementation.

As I pointed out in my original post, the thinking should be along the lines of asynchronous function calls.

I’m just starting to see what you’re doing with the async calls - sort of like a job, right? In fact, how is it different from a job?

My snag is I’d have to package up the data I want operated on, put it somewhere, then make a function that only deals with that data, not anything else. It seems easier to me to use the actual data as a barrier - so no accidental misuse of instance members or functions is even possible.

Bruce

A job operation is “some lengthy task that gets performed at an undefined time in the future.”

The thread queue operation is “calling a function call on another thread.” The called function executes immediately, and the thread queue operation returns immediately (or quickly enough for our purposes).

See the difference? Of course, the call is not really guaranteed to be executed immediately but for the purpose of analysis, you can assume that it is. In other words, it is Linearizable. The linearized portions of a system are easy to analyze. Using a thread queue and processing it once before each audio block is the best way to make correctness analysis as easy as possible (not to say that it will be “easy”, just “easier” than the alternatives, like all the locking that goes on in Juce).

What could be easier than something that looks, for all intents and purposes, to be identical to a function call (except that you have to wrap the parameters with boost::bind)?

Why do you have to package up the data? Did you see what I did with the SharedAudioBuffer? Is that what you mean by packaging? Because to me, that’s not a problem.

Bottom line, the concurrent programming nightclub has an entry fee, and you have to pay it if you want to get in. Reference counted objects, linearizability, thread safe queues, and synchronization primitives are the fee. You MUST pay the fee or you will not get a correct concurrent system. Or, put another way, concurrent programming is more difficult and there is no way to get around the extra work needed to get things functioning properly.

I refer you to this excellent article with Herb Sutter, which perfectly describes my experiences with developing high performance multithreaded systems.

Here is the original article link:
The Trouble with Locks

Here is a quote of the relevant parts of the article:

[quote=“Herb Sutter”]
So, “is concurrency really that hard?” My short answer is this:

Lock-based programming, our status quo, is difficult for experts to get right. Worse, it is also fundamentally flawed for building large programs. This article focuses exclusively on lock-based programming just because there’s so much to say in even a 50,000-foot overview that I ran out of room.

Lock-free programming is difficult for gurus to get right. I’ll save this for another time, but if you’re interested, you should check out Andrei Alexandrescu’s recent articles for a sampling of the issues in lock-free programming and hazard pointers [3, 4]. (Aside: Yes, I’m implying Andrei is a guru. I hope he doesn’t mind my outing him in public like this. I don’t think it was much of a secret.) (More aside: The hazard pointer work shows in particular why, if you’re writing lock-free data structures, you really really want garbage collection. You can do it yourself without garbage collection, but it’s like working with knives that are sharp on both edges and don’t have handles. But that’s another article. Specifically, it’s Andrei’s other article.)

Unfortunately, today’s reality is that only thoughtful experts can write explicitly concurrent programs that are correct and efficient. This is because today’s programming models for concurrency are subtle, intricate, and fraught with pitfalls that easily (and frequently) result in unforeseen races (i.e., program corruption) deadlocks (i.e., program lockup) and performance cliffs (e.g., priority inversion, convoying, and sometimes complete loss of parallelism and/or even worse performance than a single-threaded program). And even when a correct and efficient concurrent program is written, it takes great care to maintain—it’s usually brittle and difficult to maintain correctly because current programming models set a very high bar of expertise required to reason reliably about the operation of concurrent programs, so that apparently innocuous changes to a working concurrent program can (and commonly do, in practice) render it entirely or intermittently nonworking in unintended and unexpected ways. Because getting it right and keeping it right is so difficult, at many major software companies there is a veritable priesthood of gurus who write and maintain the core concurrent code.[/quote]

Thread queues directly address the complexities that Herb Sutter was referring to in that last paragraph. They don’t eliminate them of course, but they minimize it.

Lock-free and wait-free shouldn’t even be part of the conversation. The ThreadQueue that I provided in DspFilters, and the JuceThreadQueue that I provided in this example code in the original post, are more than sufficient to ship commercial products that will perform excellently. Any scenario where memory allocation is an issue (like chkn pointed out) can be handled outside of the thread queue by customizing the allocator for the objects in question using your favorite approach.

Hoping for some replies since I put so much work into this…

Looks cool, but too busy right now to give it the deep attention I’m sure it deserves!

Looks cool, but too busy right now to give it the deep attention I’m sure it deserves![/quote]

LOL…thank’s for the vote of confidence Jules! I didn’t mean to distract you…what you’re doing is way more important.

Besides, you’re already a programming guru, I doubt any of what I wrote is new or novel for you.

Looks impressive! Think i’ll need to study a bit more of what is actually going on though :stuck_out_tongue:

Vinn, I’ve not taken a look at the code yet, but I’ve made great use of functors bound to some data to create a message queue that does not need any knowledge of the messages it is handling, as a message is a functors/data object… they just get pulled out of the queue and have their ‘run’ functions executed… I think we are doing the exact same thing, but your handler is a thread job type thing, yes? I also use this technique to handle quantized edits/commands, ie. my software has the ability to quantize user commands to musical time, and so the command (functor/data) is passed on to the code that handles the actual execution, based on it’s quantize rules, which, for most commands is ‘do it now’…

Its basically calling a function on another thread.

Old way:

lock the ResamplingAudioSource critical section
change the sample rate
unlock the critical section

New way:

Call a function on the thread that owns the ResamplingAudioSource that changes the sample rate to a passed value.

Ah - that example makes sense. So, the function’s parameters are preserved, and passed when it is called - that is helpful! And presumably it’s a method of the object that is working in the thread.

I have some areas with things like:

if (oldSpeed != newSpeed)
changeSpeed (newSpeed);

Where newSpeed is set inside a lock, let’s say. With your system, it would be trivial to set the speed as needed, and when that takes place is still determined by me by means of processing the ThreadQueue at the right time. Cool.

Re: not needing lock-free. I’m not sure about that. In an intricate real-world situation with some real-time needs, locks themselves can be a real problem - threads waiting on threads waiting on threads, all at the whim of the scheduler. As Sutter says, it’s also a nightmare to maintain.

Bruce

I just used this class Vinn and it works great.

I did have a head scratching moment where I needed to know when all the jobs I had posted had finished processing. What I ended up doing was adding a finished job to the queue(as the last item of course!) that set a flag when it was called… That way my calling thread could check this flag.

Thanks for this…

[quote=“justin”]I just used this class Vinn and it works great.

I did have a head scratching moment where I needed to know when all the jobs I had posted had finished processing. What I ended up doing was adding a finished job to the queue(as the last item of course!) that set a flag when it was called… That way my calling thread could check this flag.[/quote]

Or you could give the calling thread its own queue, and have your “job finished” work item post another “job finished” message to the calling thread’s queue, this way you can avoid “polling”. It is easy to use the facilities available in Juce to give the message thread its own ThreadQueue.

But besides that, this queue was not intended to perform lengthy work items (although it can certainly be used in this fashion). It was intended for synchronizing data between threads. Or, put another way, for each thread using of mutable concurrent data (examples: playback speed, master volume, play/pause status), a separate copy of the data is maintained, and the queue is used to update a thread’s view.

Getting even more specific, lets take master volume as an example. Each thread that needs to know the master volume would maintain its own copy. When the master volume is changed, a call is made on each relevant thread’s queue to update its internal copy. Due to the nature of this design, there are moments in time where different threads have different values for the same mutable value. For example, during an audio i/o callback the master volume would be “frozen” at the value that was last picked up at the beginning of the callback, even if the GUI changes it (since the queue is only processed once per callback). This is the queue’s most important feature - mutable concurrent data can be relied not to change during critical paths, and not the ability to “defer work” (implied in the previous post).

Well I really bastardized its usage then! I had a ThreadPool executing a number of jobs. These jobs then passed a structure to your ThreadQueue which was running a Lua script(lua state init’ed on that thread queue’s thread), which then works away at the structure one at a time[which in the confines of Lua works great]

Thanks again

Yep! But remember that all the evaluation rules for boost::bind() are applicable. For example special care is needed when passing references. It is also possible to get lazy evaluation but that opens up a can of worms.

Right on, brother. Process the thread queue just once at the beginning of the audio i/o callback. If the thread queue is associated with the message thread you can implement Signal() to use Juce’s AsyncUpdater, and process the queue there. The use of AsyncUpdater is a perfect fit for the ThreadQueue design.

In the general case, this is true, but for ThreadQueue specifically, as long as the invariant is upheld: no concurrent calls to ThreadQueue::process(), it is guaranteed that progress will always be made by whichever thread is holding the queue’s lock. Since all thread queue locks are held for O(1), the serialized nature of access to the queue should not be an issue. I have to re-iterate that the calls to the queued work items themselves are made outside the lock and therefore are completely non-blocking (take a look at process()).

In my test harness, the throughput of ThreadQueue using a CriticalSection and the standard memory allocator is a few orders of magnitude faster than anything an audio-based app would need, even running on low end hardware. This was tested both with all 12 cores active, and 24 threads all communicating with each other with a queue, and also after a reboot with the BIOS set to single-core CPU mode.

The problem with ThreadQueue as it exists currently, is not its explicit use of the CriticalSection data member, but rather the implied global lock when using the standard allocator (thread queue items are allocated and freed on the heap). This can be quickly alleviated by replacing the standard allocator with a custom allocator that is designed for concurrent systems, or if you want to improve ThreadQueue yourself you can replace the calls to new/delete with your own custom system that uses a lock-free list of pre-allocated blocks and a form of per-second garbage collection (to solve the ABA problem).

One might be tempted to go lock-free with 64-bit CAS but there are no low-end systems with a 64-bit CAS so that would be pointless - the current implementation is more than sufficient for any system supporting 64-bit CAS.

Well like I said, you could certainly do that and it seems to be working for you, but in your case I would use the ThreadQueue to implement an “addJob()” command, and this would put it into the ThreadPool as its own work item. Calls put into a ThreadQueue are supposed to follow all of the same rules as those in the AudioIOCallback: minimize the number of system calls, try to execute in O(1) or O(log(n)) at the most, avoid the heap, don’t take locks. You definitely don’t want to be performing work in a ThreadQueue call.

The idea is that the ThreadQueue::call() sets up the work, the called function executes on the other thread and returns quickly, and then the associated thread starts going to work. Thats what the virtual Signal() and Reset() functions are for, you override them so your thread knows what to do when there’s work.

A shiny new CallQueue, InterruptibleThread class, and various other useful and fully optimized implementations are available in VFLib: