How does a consumer of lock-free FIFO communicate back to producer?


#1

My case is specifically for the Audio thread being the consumer of the lock-free queue, but I believe the question is abstract-enough for a General JUCE discussion.

I want to implement correctly (and in that - lock-free) the communication back to the producer thread(s).
There a couple of states that my consumer thread needs to, somehow, expose to the producer:

  • NeedProduction;
  • ContinueOnPreviousAction;
  • StopProducing.

What I’m currently working on seems… wrong. I am using AbstractFifo, not only for the main production, but I use 1, with size 1, to expose the current state of the consumer.

So, I have a code like this in every consumer (which in my current case is called DFDChannel and 1 such channel resides in each voice of a sampler instrument - 1 per voice):

DFDChannel.h:

enum DFDChannelRequest {

    NoNewActionRequired,
    SamplesNeeded,
    StopStreaming

};

class DFDChannel :    public IChannel
{

public:
    DFDChannel (const int buffChCount, const int buffSize/*, ...*/)
        : bufferSize (buffSize),
          bufferChannelCount (buffChCount),
          // ...
          buffer (bufferChannelCount, bufferSize),
          incomingFifo (buffSize),
          outgoingFifo (1),
          // ...
    {}

    /* Called by Audio Rendering Thread */
    void start (LayeredSamplerSound* snd) override;
    void stop () override;
    int fillBuffer (AudioSampleBuffer& buffer, int numSamples, int sampleIndex) override;

    /* Called by DFD Thread */
    int writeSamplesToChannel (AudioSampleBuffer& buffer);
    DFDChannelRequest getServerRequest ();

private:
    int bufferSize;
    int bufferChannelCount;
    // ...
    AudioSampleBuffer buffer;
    AbstractFifo incomingFifo;
    AbstractFifo outgoingFifo;
    DFDChannelRequest serverRequest;
    // ...

    /* Called by Audio Rendering Thread */
    void setServerRequest (DFDChannelRequest request);
    // ...

    JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (DFDChannel);

};

DFDChannel.cpp:

// ...

void DFDChannel::setServerRequest (DFDChannelRequest request)
{
    int start1, size1, start2, size2;
    int itemsCount = 1;

    // overwrite the previous, if one exist
    outgoingFifo.reset ();

    outgoingFifo.prepareToWrite (itemsCount, start1, size1, start2, size2);

    int requestsWrittenCount = size1 + size2;

    jassert (requestsWrittenCount > 0);

    serverRequest = request;

    outgoingFifo.finishedWrite (requestsWrittenCount);
}

DFDChannelRequest DFDChannel::getServerRequest ()
{
    int start1, size1, start2, size2;
    int itemsCount = 1;

    outgoingFifo.prepareToRead (itemsCount, start1, size1, start2, size2);

    int requestsReadCount = size1 + size2;
    DFDChannelRequest result = serverRequest;

    if (requestsReadCount == 0)
    {
        // set the default request
        result = DFDChannelRequest.NoNewActionRequired;
    }

    outgoingFifo.finishedRead (requestsReadCount);

    return result;
}

// ...

//cc: @timur, @jules

Any help is much appreciated.

Cheers!


#2

Could I just use an Atomic field for this state/flag of the consumer?

Something like this:

DFDChannel.h:

enum DFDChannelRequest {

    NoNewActionRequired = 1,
    SamplesNeeded,
    StopStreaming

};

class DFDChannel :    public IChannel
{

public:
    DFDChannel (const int buffChCount, const int buffSize/*, ...*/)
        : bufferSize (buffSize),
          bufferChannelCount (buffChCount),
          // ...
          buffer (bufferChannelCount, bufferSize),
          serverRequest (DFDChannelRequest.NoNewActionRequired)
          // ...
    {}

    // ...

private:
    // ...
    Atomic<int> serverRequest;
    // ...

}

DFDChannel.cpp:

// ...

void DFDChannel::setServerRequest (DFDChannelRequest request)
{
    serverRequest.set (request);
}

DFDChannelRequest DFDChannel::getServerRequest ()
{
    DFDChannelRequest result = serverRequest.get ();

    setServerRequest (DFDChannelRequest.NoNewActionRequired);

    return result;
}

// ...

#3

The status seem to be solved, but I have another instance of the same problem, which can’t be solved this way.

In DFDServer, I want to keep an array of pointers to the voices of the sampler. Right now, the only thing I could think of is to do the described in the initial post of this topic - keep an AbstractFifo with 1 element. And whenever there is a change to the voice array in the sampler (i.e. adding removing voices) I would send an update. The 1 element in this update being an array of pointers to voices.

How do you solve this, @timur @jules?

P.S.

  • I looked in the code of the Synthesiser type and there seem to be a lot of locking going on - even in the processBlock. I am getting confused - shouldn’t at least the processBlock, somehow be lock-free? Also why is the same lock used for both voice and sound state (CRUD on voices and sounds)?

#4

I just have another FIFO from the audio thread back to the message thread, no sweat. Of course, the message thread must poll this queue on a Timer, but this is fine in my experience.

As for your array of voices problem, I haven’t read your code, but I would advise you to be careful about sending pointers back and forth, especially for things you’re going to delete. What if you delete a voice on the message thread, then the audio thread tries to access it before it receives a message it’s been deleted?

I believe that the Synthesiser class uses an internal lock because in this case it’s a rare event for voices to be added/removed, and the alternative would be much more complex. The same might hold for your case. I feel it’s ok to use a lock if the event is quite rare (i.e. won’t be continuously happening) and the alternative solution would be horrendously complicated. As always, use your discretion and test it in real situations.


#5

Pretty much what I am doing right now, yes. On a timer in a background thread I am reading the state of all channels (who needs samples, etc.) and if a channel needs samples, I add a new ThreadPoolJob to acquire them from the disk and flush to the channel.

As for the pointers to voices - yeah, I thought about this and now I am not sending voice pointers - instead every DFDChannel (1 residing as a member of each voice) adds itself to a static singleton of the DFDServer (shared for all instances of the plugin) during its construction (the construction of the channel). If a voice is added its DFDChannel will add itself to the DFDServer, if a voice is being deleted it will not delete the DFDChannel instance it has pointed to so far, but instead will mark it as deleted (the same flag for NeedSamples, StopStreaming has a 3rd state Deleted) and the DFDServer will pick it up and finally delete it on the next timer run. This adding is also done through a AbstractFifo so it’s lock free as well.

It should be all lock-free on the Audio Thread, but I haven’t completed it, yet, so I can’t be certain if it will work when assembled.

Thank you very much for the response, @widdershins.


#6

Why do you want to use a timer for checking if your DFDChannel needs new samples? I would advise to Implement something that checks if samples are available and notify the ThreadPoolJob to load new samples when there are no samples left to consume.

I am using two buffers that are continuosly swapped. If one buffer is consumed (= all samples are read), the buffers will swap and this buffer will be filled with new samples (by adding a ThreadPoolJob). This operation must be done before the new buffer is consumed, but you can increase the buffer size if you need more time.

And You’ll need preload buffers for every sound that are used as first read buffer when the voice is started (Because you’ll need the first samples instantly).


#7

I think I started using your implementation - thanks again for writing that and sharing it. It was the first and only example I found.

Unfortunately, this approach wasn’t performant-enough for my case (DFD on GBs of samples). No matter what settings I would use for buffer size and number of voices, I would always get a too late finishing of IO operation within a few seconds to a minute of playing the instrument. Also, I believe there was a race issue, which would sometimes cause glitches and/or exceptions.

In your example every SampleLoader (that was the name right?) continuously reads chunks the size of the preload buffer without any priority assigned. In my design I have a DFDServer which runs on a low priority background thread and on a timer, it reassigns priority to every DFDChannel’s data requests (the voice calls it’s DFDChannel for samples instead of the SampleLoader, but the DFDChannel doesn’t actually read samples from the disk - there is a DFDJob: ThreadPoolJob, which the DFDServer manages). So if DFDChannel A has smaller number of samples left than DFDChannel B, B will get higher priority (will be served sooner).

Finally, I wanted all this to happen in a lock-free way.


#8

Well I am using a (vastly improved) version of this code in my current sampler framework and didn’t run into performance issues (I can stream about 400 voices without dropouts which is approximately the read speed of my SSD so I am hitting the natural hardware limit here.)

I don’t understand why you would change the priority of a streaming request. As soon as a voice started, all voices are in the same need of new samples (if they run out of samples, they cause a drop out). If a buffer needs to be filled, the voice tells the SampleLoader to do this ASAP. If the SampleLoader is so busy with other voices, there is no way of not causing a drop out and juggling priorities won’t change that fact. It uses of course using a FIFO principle for the selection of the sample to be streamed.

If you want, I could send you the current version (it is a fully working sampler with scripting and all). I am going public in a few months (it will be GPL with a commercial option so maybe you could just use this as starting point and spare you some headache).

Please PM me if you’re interested.