Concurrency, meters, JUCE classes

Hello guys !

I have a couple of questions for the concurrency gurus of JUCE (Timur, Jules can you hear me ? :slight_smile: )

I have written a few times some code for doing metering in plug-ins, but most of the time my implementations were ā€œnaiveā€ to say the least, good enough in general but not clean enough I think to handle some potential concurrency issues. I’m trying to do something better right now, but I am not sure to know how to start… So here are my questions :

  • I’m a little confused by a few things I have seen in the example projects code and in Timur’s presentations so I’d like to have some explanations. I know that the use of C++11 has come in the base code more and more often since JUCE 4 and is probably going to be an exclusive prerequisite for the use of JUCE in the future. The thing is, a few functions which exist in JUCE exist also in the std library, which I don’t know very well, such as the handling of threads, atomic variables, arrays, and have been updated a couple of times there but not that much in JUCE. So, I’d like to know if specialists recommend using std ones or JUCE ones. For threads for example, the use of them in JUCE seems simpler, but std:atomic seem better than JUCE::Atomic… I wonder also if some people use the JUCE:AbstractFIFO class…

  • Other question, I have an array which is filled in the audio thread, and I have a function which can be called as often as needed in the UI thread which reads some data from this array. That’s my central problem and I have not found any good intel on it yet. How can I do it properly ? Until now, what I did was either ignoring all potential concurrency issues, which can be summarized here as having a not atomic read/write of the values in the array - or locking something, by putting a CriticalSection for the whole read/write functions, or in the templated declaration of the Array, which seems to be better but still imply the possibility of the audio thread locking in worst cases. How can I do better than that if it’s not good enough without needing any lock-free external library, using only JUCE classes or C++11 / std functions ? Is it even possible ? Any thought ?

  • Last question : my calls to the read function which reads the content of the array is called every 50 ms for example. But if something bad happens on the UI thread, a lot of calls might be suspended until the CPU load allows the functions to work again properly. In a locking context, something like that is bad, so I was thinking about trying to use a kind of queue for making these calls, where extra calls might be removed, and maybe another thread to do it so the UI doesn’t freeze in worst cases because of the metering display. So, for example, if one call happens every 50 ms, everything is fine. But if 8-9 calls happen all at the same time for whatever the reason, 7-8 calls are dismissed, and only the last one is happening. So, I know that it is supposed to be some basic stuff, but I’d like to know how to do that properly. And again, can I do that with JUCE only ? Do I need some std classes ?

I’d really love to see more tutorials on this subject, because I have not seen yet some open source code for something which is supposed to be very simple, such as an audio meter, satisfactory enough for my limited understanding.

Thanks in advance !

Ivan

2 Likes

I am by no means a ā€œconcurrency guruā€, but I have some experience to share on some of your questions:

  • No idea if the atomics implemented in JUCE are better than std::atomic, but they do work and I haven’t met performance problems or bugs using them. Reads seem to be cheap-enough, still I do see them when profiling, here and there but nothing too critical for now.

  • You can use an AbstractFifo for this array. It’s an ā€œabstractā€ read/write controller, not in the sense of OOP ā€œabstractā€ notion - you can and have to instantiate it. The way it works is you keep an instance of AbstractFifo giving it the same size on initialization as the array/list/queue you want to make lock-free and you have the actual array/list/queue/etc. instantiated, too. It’s a single reader, single writer queue (ring buffer) controller and on the reader side you ask it ā€œhow many items can I read, and where can I read them from (as indexes)?ā€ (check prepareToRead in the docs) and once you read the items (from your own array, the same one you use now) you tell it you’re done (check finishedRead in the docs). Same mechanism applies for the writing (check prepareToWrite and finishedWrite in the docs). Here is an example from my current project - this is a state manager receiving and processing parameter changes, lock-free:

StateManager.h:

    class StateManager    :    public AudioProcessorValueTreeState::Listener
    {

    public:
        StateManager (const int maxConcurrentChanges = 1024);

        ...

        void processConcurrentChanges ();

        /* AudioProcessorValueTreeState::Listener methods */
        void parameterChanged (const String& parameterId, float newValue);

    private:
        struct QueuedParameterChange
        {
            ...
        };
        ...

        AbstractFifo concurrentChangesFifo;
        Array<QueuedParameterChange> concurrentChangesQueue;


        void applyParameterChange (const String& parameterId, float newValue);


        JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (StateManager);

    };

StateManager.cpp:

    StateManager::StateManager (const int maxConcurrentChanges)
        :   ...
            concurrentChangesFifo (maxConcurrentChanges)
    {
        concurrentChangesQueue.resize (maxConcurrentChanges);
    }

    void StateManager::parameterChanged (const String& parameterId, float newValue)
    {
        QueuedParameterChange change (parameterId, newValue);

        int itemsCount = 1,
            start1, size1, start2, size2;

        concurrentChangesFifo.prepareToWrite (
            itemsCount,
            start1,
            size1,
            start2,
            size2
        );

        int writtenItemsCount = size1 + size2;

        if (size1 > 0)
        {
            concurrentChangesQueue.set (start1, change);
        }

        if (size2 > 0)
        {
            concurrentChangesQueue.set (start2, change);
        }

        concurrentChangesFifo.finishedWrite (writtenItemsCount);
    }

    void StateManager::processConcurrentChanges ()
    {
        // get all at once
        int itemsCount = concurrentChangesFifo.getNumReady (),
            start1, size1, start2, size2;

        concurrentChangesFifo.prepareToRead (
            itemsCount,
            start1,
            size1,
            start2,
            size2
        );

        int readItemsCount = size1 + size2;

        jassert (readItemsCount >= 0);

        if (size1 > 0)
        {
            for (int i = 0; i < size1; ++i)
            {
                const QueuedParameterChange& change = concurrentChangesQueue [start1 + i];
                applyParameterChange (change.parameterId, change.newValue);
            }
        }

        if (size2 > 0)
        {
            for (int i = 0; i < size2; ++i)
            {
                const QueuedParameterChange& change = concurrentChangesQueue [start2 + i];
                applyParameterChange (change.parameterId, change.newValue);
            }
        }

        concurrentChangesFifo.finishedRead (readItemsCount);
    }

    ...
  • This one gets resolved with the previous - the reader thread doesn’t block the writer thread and vice versa.

I hope this is helpful.

Cheers!

3 Likes

Thanks for your answer and for sharing some code ! I’ll have a look in detail :wink:

However, I have a few questions already :

  • Would it be possible in the processConcurrentChanges functions to see if multiple items share the same parameterId, and apply the parameter changes only to the last one ?

  • In my case, I have a function called ā€œprocessSamplesā€ which fills the array which needs to be shared, and a function ā€œrefreshā€ where the content of the array is read. Let’s say the size of the array equals the current audio buffer block size. What happens if the refresh function is called two times more often than the processSamples function ? And what happens when refresh is called only every 3 calls of processSamples ? Does that mean that I should use another type of queue, such as a Single Writer Multiple Readers queue instead ?

Yep! You can iterate the changes and just take the last one of any kind.

How about using a different queue size? If you can calculate or empirically discover a reasonable size (or make it a user setting), it will never overflow with data on writing.

If you don’t manage to fill it on time - I probably don’t have enough context here to give an advice, but from the top of my head:

  • If the production of samples can be parallelised, do it and use multiple SingleProducerSingleConsumer queues (like the one I showed earlier);
  • (highly unlikely in my modest experience, but) Perhaps your production runs on a thread with very low priority;

I have built a DFD Engine which produces samples on multiple threads, and they are all read later on the audio thread, entirely lock-free.
It wasn’t very easy to do (at least for me), but it’s definitely doable. The biggest problems I had were:

  • quick cancellation of obsolete production tasks;
  • the communication backwards Audio Thread -> Production, which also needs to be lock-free.

Sounds like hard work !

Indeed that would solve the issue of overflow :wink:

The multiple queues is a good idea, I might use say 2 or 3 of them, each one being filled at a different time, I have to think about that…

My context is the following : I have my processSamples called for every audio block, at the buffer size specified in the host. And I have defined somewhere two variables, a refresh time, and a painting time. The refresh time is the time wanted between two calls of the refresh function, which reads the array of samples, and return say the maximum absolute value seen in this array. Then, the paint function is called every ā€œpaint timeā€ and display this value in a bargraph, with some inertia on the release side. Usually, the refresh time and the paint time are the same, but in other things I do to display some audio data, they are not.

So, let’s say my queue has 8192 samples (high enough to prevent overflow). I’m trying to refresh/paint every 33 ms to have 30 FPS on the display side. 33 ms in 44100 Hz means around 1450 samples. We can see easily that can yield to a problem of synchronisation between what is displayed and what is happening…

The question to ask here is about what to do with ā€œoverlapā€. Do I want to read only once every sample in the refresh function, which means it will return a value only every N samples read ? N must be close to the 1450 samples then, and sometimes some samples might not be read at all if we have waited too long because of what’s happening around the timer. Or maybe the calls will be too close and the refresh function will have to return the last value since not enough samples have been given yet by the processSample function.

Or we can decide to tolerate some overlap, which seems to be for me the best solution. Each time the refresh function is called, I want it to consider the last N samples inside the array given by the processSamples function. Which means that I need to wait for the processSamples to finish filling, or make it filling without interfering with the refresh. And if the refresh call happens later, some samples will never be read but that’s not a problem at all for me and for the display (I have a different process which calculates the true current maximum absolute value of all the samples, and display it in plain text).

I’ll read again all the things you’ve told me to do the overlap thing in lock-free :wink:

So you search for max(abs([sample1, sample2,..., sampleN])) within a frame of samples, and visualize it in a gain bar, no ramping to max, ramping is only in the opposite direction, right?

I would try updating the UI-read variable on much bigger interval than 1/30sec, more like 1/5sec, which basically means I only need to preserve the last (1/5) * sampleRate samples. Those samples can be stored in a big-enough (it all depends on what is the maximum time the timer can be slowed down) SingleProducerSingleConsumer queue.

BTW if your message thread is too busy all the time (which is where all timers run as much as I know), you can make a separate thread and implement your own timer logic. I have done this in a few places where I need more predictable scheduling than the message thread and it’s definitely much more reliable. Here is a simple example of such scheduling logic:

while (shouldRun.get() > 0 && isRunning.get () > 0)
{
    // where you do your custom timer work
    timerCallback();

    const double currentTime = Time::getMillisecondCounterHiRes (),
                 targetTimeToWaitUntil = currentTime + interval;

    Time::waitForMillisecondCounter (targetTimeToWaitUntil);
}

I believe this would pause the thread for the specified interval.
shouldRun and isRunning are atomics (to stop the custom timer, one must set the shouldRun to 0 and wait 'till isRunning becomes 0). This doesn’t take into account time spent during timerCallback, but implementing that would be pretty easy.

Then in my component, visualizing the gain I would inherit the AsyncUpdater, and expose a public method to receive the change and to trigger async change on the visualized variable. This means that the object which calculates the new value for your bar (the one triggered on timer), would get the latest chunk of samples, calc the value and call the component to update it. I think AsyncUpdater even allows cancelling updates on a value that haven’t gone through, yet, you can check the docs for more info about that.

Basically this way, the framerate of your component is not so dependent on the rate you update this value.

Once you have that set up, and if you find the updates are not fast enough, try lower the interval for updates from Timer -> Visual Component.

1 Like

Thanks a lot for all the information, I’ll try that :wink:

Also, the ramp doesn’t happen within the timerCallback, right? You can use LinearSmoothedValue in the component. When the async update is triggered:

if (newValue < oldValue)
{
    // ramping down
    ramp.setValue (newValue);
}
else if (newValue > oldValue)
{
    // directly jumping to target value
    ramp.setValue (newValue);
    ramp.reset (sampleRate, rampLengthInSeconds);
}

I’m using already something similar, using the refresh rate as a sample rate, and an exponential ramp :wink:

I have implemented all the stuff here now, but I’m still not where I want to be. I want to read the last N samples filled at every refresh call, and using the AbstractFIFO doesn’t allow me to do so, since it is made for data that needs to be read from the first to the last value written in the array. In my case, all the samples doesn’t have to be read, and it is possible to read more than once the same value. Moreover, I’m still not sure if the refresh function must be made a way preventing multiple calls happening at the same time. All this stuff is more tricky than I thought…

#include <set>

void StateManager::processConcurrentChanges ()
{
	// get all at once
	int itemsCount = concurrentChangesFifo.getNumReady ();
	int start1, size1, start2, size2;
	concurrentChangesFifo.prepareToRead (
		itemsCount,
		start1,
		size1,
		start2,
		size2
	);
	int readItemsCount = size1 + size2;
	jassert (readItemsCount >= 0);
	std::set<typeof(change.parameterId)> readSet; // http://www.cplusplus.com/reference/set/set/count/
	// To go backwards: Do the second set first, and iterate in reverse
	if (size2 > 0) // go backward
	{
		for (int i = size2 - 1; i >= 0; --i)
		{
			const QueuedParameterChange& change = concurrentChangesQueue [start2 + i];
			if(readSet.count(change.parameterId) == 0){
				readSet.insert(change.parameterId);
				applyParameterChange (change.parameterId, change.newValue);
			}
		}
	}
	if (size1 > 0)
	{
		for (int i = size1 - 1; i >= 0; --i) // go backward
		{
			const QueuedParameterChange& change = concurrentChangesQueue [start1 + i];
			if(readSet.count(change.parameterId) == 0){
				readSet.insert(change.parameterId);
				applyParameterChange (change.parameterId, change.newValue);
			}
		}
	}
	concurrentChangesFifo.finishedRead (readItemsCount);
}
1 Like

I may not have read your needs right, but I wrote a simple ā€œRingBufferā€ class to record a stream of N samples and allow a ā€œsnapshot in timeā€ to be taken at any point.


(It could do with some updating to use more modern C++!)

My context was with pitch tracking, which allows both overlapping or separated snippets of the audio stream depending on the processing power available.

I collect the samples on the audio thread, and do the calculation in a ā€˜worker’ thread. Whenever the GUI requires an update it sets a (std::atomic <bool> ) flag to trigger another calculation, and sends a notify to the worker thread, which, once finished sets the flag back and calls wait on itself.
I also have a ScopedLock in the calculate method which I’m not sure is necessary…
This was all inspired by Timur’s talk at the Summit last year and helped me get my GUI much more fluid on mobile devices.

AbstractFifo is a class that manages access to a buffer (that you provide). It works like a ringbuffer only with atomic operations to move the read/write indexes. When you need to write to the AbstractFifo, you request a size and it atomically moves the write pointer using fetch-and-add. Since the fetch returns the previous and the add increments, you can subtract the two to get the allocated size.. Edit: My mistake, it’s single producer/single consumer and uses a simple atomic int.

To make it a ring buffer, it needs to modulo the write pointer with the size of the underlying array. Because this has a possibility of splitting the write space between the end of the buffer and the beginning, you need (at most) two contiguous blocks. E.g. { (start1,size1), (start2,size2) } in @tsenkov’s code.

Your ringbuffer overwrites the previously read values (which is fine for a single parameter), but it has the deficiency of needing to copy the values to a read space to straighten them out. First, you’re using a for-loop to copy. You should be able to split the copy up into two memcpy() calls and give the compiler the ability to use intrinsic functions to copy large blocks. Second, mem-align the array’s starts to 64bit boundaries.

AbstractFifo manages the read/write to prevent overwriting values. If there was only a single parameter being updated it wouldn’t be a problem, but from the setup I’m assuming that multiple parameters are being sent through, each tagged with a parameterID.

When IvanC says he wants the last N parameters, what he’s saying is that he wants the last change from each parameterID for N parameters. These parameter change events aren’t guaranteed any specific order or count. He just needs the last snapshot when he gets around to reading the fifo queue:

His options are:

  • Read through, applying all changes (applyParameterChange)
  • Accumulate changes and apply at final (bucket or map)
  • Iterate backwards, keeping track of what he’s seen. (std::set)

And this is only is the previous event values in no way shape or form affect what he’s trying to do. If this is the case, then it might be enough to use a std::map<typeof(parameterId), float> and simply keep the last value. However, the hash function of std::map might cause a lag on the write thread, which needs to be real-time. Hence the non-locking AbstractFIFO.

1 Like

Thanks for the information ! I see that I really need to learn properly the terminology and use of queues, buffers etc. I have taken a few books about concurrency and std functions uses, I’m going to read all this stuff right now, since I don’t just want the solution, but I want also the understanding of what happens around…

By the way, my actual meter code ā€œworksā€, but it could be a lot better.

OK so I have been able to solve my problems. I have a class acting like the Abstract FIFO / stack where I have an atomic pointer for the write position. At read, I look for this write position, and start reading at that position - N samples.

I have realized also that a few functions / classes in JUCE have been made a long time ago because C++11 has been slow to come up, such as the JUCE atomic class, and since I’m using now C++11 everywhere I should use for example std::atomic instead :wink:

I do use also the AsyncUpdater class to call the repaint, and at choice a thread or the threaded HighResolutionTimer class to trigger the AsyncUpdater, instead of a Timer.

Here is the abstract class I use (still raw for now) :

[code]class AbstractDataStructure
{
public:
AbstractDataStructure(const int capacity, const int samplesToRead) noexcept
{
jassert(capacity > 0);
jassert(samplesToRead > 0 && samplesToRead < capacity / 2);

    bufferSize = capacity;
    readSize = samplesToRead;
    writePos = 0;
}

~AbstractDataStructure() {}

// -------------------------------------------------------------------------
void reset() noexcept
{
    writePos.store(0);
}

void prepareToWrite(int numToWrite, int& startIndex1, int& blockSize1, int& startIndex2, int& blockSize2) const noexcept
{
    const int ve = writePos.load();
    const int localReadSize = readSize.load();

    numToWrite = jmin(numToWrite, bufferSize - localReadSize);

    if (numToWrite <= 0)
    {
        startIndex1 = 0;
        startIndex2 = 0;
        blockSize1 = 0;
        blockSize2 = 0;
    }
    else
    {
        startIndex1 = ve;
        startIndex2 = 0;
        blockSize1 = jmin(bufferSize - ve, numToWrite);
        numToWrite -= blockSize1;
        blockSize2 = numToWrite <= 0 ? 0 : jmin(numToWrite, bufferSize - localReadSize);
    }
}

void finishedWrite(int numWritten) noexcept
{
    jassert(numWritten >= 0 && numWritten < bufferSize);

    int newEnd = writePos.load() + numWritten;

    if (newEnd >= bufferSize)
        newEnd -= bufferSize;

    writePos.store(newEnd);
}

void prepareToRead(int& startIndex1, int& blockSize1, int& startIndex2, int& blockSize2) const noexcept
{
    const int localReadSize = readSize.load();

    int vs = writePos.load() - localReadSize;
    if (vs < 0) vs += bufferSize;

    int numWanted = localReadSize;

    startIndex1 = vs;
    startIndex2 = 0;
    blockSize1 = jmin(bufferSize - vs, localReadSize);
    numWanted -= blockSize1;
    blockSize2 = numWanted <= 0 ? 0 : numWanted;
}

void setReadSize(int newValue) noexcept
{
    jassert(newValue > 0 && newValue < bufferSize / 2);
    readSize.store(newValue);
}

private:
int bufferSize;

std::atomic <int> readSize;
std::atomic <int> writePos;

JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(AbstractDataStructure)

};[/code]

It’s a little like the AbstractFIFO class, but I take into account the fact that I always have to read the same number of samples in the read function, so I can set it up (atomically) in the constructor or anytime using the set function.

Any remark ? Maybe someone could tell me how to call this data structure ? It’s not a queue(FIFO), it’s not a stack (LIFO), what is it ?

2 Likes