Based on a discussion in another thread (http://rawmaterialsoftware.com/viewtopic.php?f=2&t=7645) I have consolidated an example of concurrent programming (threads) uses a thread queue (FIFO) of functors combined with a juce Thread to achieve asynchronous function calls with variable sized parameter lists. These asynchronous function call parameters may be type or object, including reference counted objects. Since the implementation is completely type safe and preserves all invariants of objects passed through the system, the usage of reference counted objects requires no additional thought or oversight on the part of the programmer beyond the usage requirements already imposed by Juce.
My approach to thread queues, which I believe is the best one for our end-user audio related purposes, is to think about things in terms of making function calls which are executed asynchronously on other threads. I prefer not to think about what type of queue it is, what the features of the queue are, or how the queue is implemented (except for when I have to actually write the queue).
At the higher level, in the design stage, I want a building block that abstracts away all of the concerns about object lifetime management, memory allocation, and all of that. The reason being, that getting concurrent apps working is HARD. Having a simple model, makes analysis and reasoning about the correctness of concurrent algorithms much easier. I will provide a simple example:
In this example, the audio i/o callback function renders a block of audio to pass it back to the Juce audio device manager so it can be played through the sound device, but it also makes a copy of the audio block and sends it to another thread which is responsible for recording the live audio output to disk (or it could be a shoutcast radio station)
The code example assumes knowledge of the audio specific portions of the Juce library. Member functions required by Juce, but not relevant to this example, have been intentionally omitted for brevity. Because I use multiple inheritance, I have prefixed member function calls with the relevant class using the scope resolution operator. This is not necessary to compile, but included for clarity. I also left out access control specifiers, everything is public.
The ThreadQueue object used in the example is provided by DspFilters:
http://code.google.com/p/dspfilterscpp/source/browse/trunk/demo/ThreadQueue.h
http://code.google.com/p/dspfilterscpp/source/browse/trunk/demo/ThreadQueue.cpp
First we need to create a combination juce::Thread, and ThreadQueue. Here it is:
#include "juce.h"
#include "ThreadQueue.h" // from DspFilters
#include <boost/bind.hpp> // or can use std::bind or std::tr1::bind if available
// Thread Queue that works with a Juce thread's event system
struct JuceThreadQueue : ThreadQueue, Thread
{
JuceThreadQueue (const String& threadName)
: Thread (threadName)
, m_stop (false)
{
ThreadQueue::open ();
Thread::startThread ();
}
~JuceThreadQueue ()
{
ThreadQueue::call (boost::bind (&JuceThreadQueue::doStop, this));
ThreadQueue::close ();
Thread::waitForThreadToExit (-1);
}
void signal () // overrides ThreadQueue::signal
{
Thread::notify ();
}
void doStop ()
{
m_stop = true;
}
void run ()
{
while (!m_stop)
{
Thread::wait (-1);
ThreadQueue::process ();
}
}
bool m_stop;
};
Now we need a reference counted AudioSampleBuffer:
// Reference counted AudioSampleBuffer
struct SharedAudioBuffer : AudioSampleBuffer,
ReferenceCountedObject
{
// This is for convenience
typedef ReferenceCountedObjectPtr <SharedAudioBuffer> Ptr;
// Because current C++ doesn't have perfect constructor forwarding,
// we must manually create these constructors ourselves. c++0x solves this.
// I only put in the constructors needed for the example.
SharedAudioBuffer (
int numChannels,
int numSamples)
: AudioSampleBuffer (numChannels, numSamples)
{
}
// This useful function is missing from AudioSampleBuffer so I
// put it here. Jules should add it! It assumes outputChannelData
// has the correct number of channels.
void copyTo (float** outputChannelData)
{
for (int i = 0; i < getNumChannels(); ++i)
memcpy (outputChannelData [i],
getArrayOfChannels ()[i],
getNumSamples() * sizeof (outputChannelData[0][0]));
}
};
Here is the Recorder object. It uses the JuceThreadQueue as a thread to receive function calls with reference counted buffers of audio data. When the Recorder is deleted, everything is shut down in an orderly fashion including the thread. Anything that was put into the queue is guaranteed to be executed, even if the recorder object is deleted when the queue is not empty.
// Records audio blocks to a file or network stream, using another thread
struct Recorder
{
Recorder () : m_queue ("Recorder")
{
}
// called from the audio i/o callback thread
void add (SharedAudioBuffer::Ptr buffer)
{
m_queue.call (boost::bind (&Recorder::doAdd, this, buffer));
}
// called on our own thread NOT the audio i/o callback thread
void doAdd (SharedAudioBuffer::Ptr buffer)
{
// Here, application does what it wants with buffer
// For example, append it to a file or network stream.
// The reference counting will take care of disposing buffer
// when it is no longer used - no additional code is needed for it.
}
JuceThreadQueue m_queue;
};
To put everything together, here is the Mixer object, an audio i/o callback which renders the sound and passes it back to Juce. It contains a “Recorder” object, where it sends the reference counted buffer to another thread to do stuff. When the Mixer object is destroyed, the Recorder is automatically cleaned up. All reference counted buffers and asynchronous calls are guaranteed to be treated properly, and the thread is guaranteed to be shut down in an orderly way.
// Produces output to the audio device
struct Mixer : AudioIODeviceCallback
{
// Renders the next block of audio and returns it as a reference counted buffer
SharedAudioBuffer::Ptr renderNextAudioBlock (
const float** inputChannelData,
int numInputChannels,
float** outputChannelData,
int numOutputChannels,
int numSamples)
{
SharedAudioBuffer::Ptr buffer (
new SharedAudioBuffer (numOutputChannels, numSamples));
// Here, *buffer is written to with the application-specific generated audio
return buffer;
}
void audioDeviceIOCallback (
const float** inputChannelData,
int numInputChannels,
float** outputChannelData,
int numOutputChannels,
int numSamples)
{
SharedAudioBuffer::Ptr buffer = renderNextAudioBlock (
inputChannelData,
numInputChannels,
outputChannelData,
numOutputChannels,
numSamples);
// Copy to device output
buffer->copyTo (outputChannelData);
// Give the buffer to the recording thread
m_recorder.add (buffer);
}
Recorder m_recorder;
};
This code compiles and SHOULD work, perhaps after fixing a minor bug or two.
I hope this stimulates your imagination and expands your programming and code design horizons.
If you have questions or criticisms, blast away.