Lock Free Queue data copy between Processor and Editor

I’m trying to get samples from the Processor across to the Editor and I’m using a Lock Free Queue to avoid concurrency on resources. The problem I’m facing is that the two data channels are not (always) time aligned when copied to the editor (different number of initial zeros) and sometimes blocks of data are copied twice or skipped.

Data is written in the lock free queue when the Processor executes the processBlock method. Then it is read in the Editor by a timer callback. Samples are emplaced back in a vector and the whole vector containing all the samples passed by the processor are plotted in the editor.

class LockFreeQueue
{
public:
    juce::ScopedPointer<juce::AbstractFifo> lockFreeFifo;

    juce::Array<float> data;
    int lastReadPos{ 0 };
    int capacity{ 2048 };

    LockFreeQueue()
    {
        lockFreeFifo = new juce::AbstractFifo(capacity);

        // clear
        data.ensureStorageAllocated(capacity);
        data.resize(capacity);
        juce::FloatVectorOperations::clear(data.getRawDataPointer(), capacity);
        while (data.size() < capacity)
        {
            data.add(0.0f);
        }
    }

    void setTotalSize(int newSize)
    {
        lockFreeFifo->setTotalSize(newSize);

        // clear
        data.ensureStorageAllocated(newSize);
        data.resize(newSize);
        juce::FloatVectorOperations::clear(data.getRawDataPointer(), newSize);

        while (data.size() < newSize)
        {
            data.add(0.0f);
        }
    }

    void writeTo(const float* writeData, int numToWrite)
    {
        int start1, start2, blockSize1, blockSize2;

        lockFreeFifo->prepareToWrite(numToWrite, start1, blockSize1, start2, blockSize2);

        if (blockSize1 > 0) juce::FloatVectorOperations::copy(data.getRawDataPointer() + start1, writeData, blockSize1);
        if (blockSize2 > 0) juce::FloatVectorOperations::copy(data.getRawDataPointer() + start2, writeData + blockSize1, blockSize2);

        // Move the Fifo write head
        lockFreeFifo->finishedWrite(numToWrite);
    }

    void readFrom(float* readData, int numToRead)
    {
        int start1, blockSize1, start2, blockSize2;

        lockFreeFifo->prepareToRead(numToRead, start1, blockSize1, start2, blockSize2);

        if (blockSize1 > 0)
        {
            juce::FloatVectorOperations::copy(readData, data.getRawDataPointer() + start1, blockSize1);

            lastReadPos = start1 + blockSize1;
        }
        if (blockSize2 > 0)
        {
            juce::FloatVectorOperations::copy(readData + blockSize1, data.getRawDataPointer() + start2, blockSize2);

            lastReadPos = start2 + blockSize2;
        }

        lockFreeFifo->finishedRead(blockSize1 + blockSize2);
    }

    int writeToVector(std::vector<float>* dest, int destPos)
    {
        // Drain the excess
        while (getNumReady() > dest->size())
        {
            lockFreeFifo->finishedRead(getNumReady() - dest->size());
        }

        // Read latest data from the LFQ
        const int numToAppend = getNumReady();
        // Add the tail (one buffer's worth) to the output
        if (destPos + numToAppend < dest->size())
        {
            readFrom(&dest->data()[destPos], numToAppend);
        }
        else
        {
            int toTheEnd = dest->size() - destPos;
            readFrom(&dest->data()[destPos], toTheEnd);
            readFrom(&dest->data()[0], numToAppend - toTheEnd);
        }

        return numToAppend;
    }

    int getNumReady()
    {
        return lockFreeFifo->getNumReady();
    }

};

For loop in the processBlock method in Processor class. lfqChannels is a unique_ptr a un vector di LockFreeQueue.

    for (int channel = 0; channel < totalNumInputChannels; ++channel)
    {
        if (lfqChannels != nullptr)
        {
            for (size_t i = 0; i < buffer.getNumSamples(); i++)
            {
                sample = buffer.getSample(channel, (int)i);
                lfqChannels->at(channel).writeTo(&sample, 1);
            }
       }
   }

Timer callback in the Editor. What I want to do is reading from the lockFreeQueue and write in a temporary array or vector then copy that to another local copy that puts all the samples together as they are read from the processor. Example: the signal in my DAW is read by the processor as I press play, then the sample blocks should be passed to the editor that shows all the samples read by the processor. lockFreeQueueDataCopy is used to plot the whole signal.

void AAIRAudioProcessorEditor::timerCallback()
{
    auto lfqPtr = audioProcessor.getLockFreeQueue();
    if (lfqPtr != nullptr)
    {
        if (!lfqPtr->empty())
        {
            for (int channel{ 0 }; channel < audioProcessor.getTotalNumInputChannels(); channel++)
            {
                auto lfqNumReady = lfqPtr->at(channel).getNumReady();
                if (lfqNumReady > 0)
                {
                    std::vector<std::vector<float>> tempArr(audioProcessor.getTotalNumInputChannels());
                    tempArr[channel].resize(lfqNumReady);
                    for (int i{ 0 }; i < tempArr[channel].size(); i++)
                    {
                        tempArr.at(channel).at(i) = 0.0f;
                    }

                    int numAdded = audioProcessor.getLockFreeQueue()->at(channel).writeToVector(&tempArr.at(channel), 0);
                    
                    for (size_t i{ 0 }; i < tempArr[channel].size(); i++)
                    {
                        lockFreeQueueDataCopy.at(channel).emplace_back(tempArr.at(channel).at((int)i));
                    }
                }
            }
        }
    }
}

I’m not sure if I understood you correctly. But I think the mistake is you are writing/pushing a single sample each time in the fifo (and then each single channel). You want to assure you synchronize them while you are writting the fifo.

In other words, if you want to sync all the audio channels, you have to options:

  1. In case you know the maximum number of channels, ie, 2, you could create an AbstractFifo<std::pair<float, float>>, or similar (tuple or custom struct if you need more channels).

  2. Buf if your plugin may use an undefined number of channels, you would need to push the samples in blocks, interleaving channels. ie, for two channels, your fifo may be ABABABABAB, for three ABCABCABC… NOTE: Write always complete packs, ABC-ABC-ABC, dont do AB-BCA-B-CABC :slight_smile:

Anyway: with both options, avoid writting in the audio thread a FIFO sample per sample. You are using two cst atomics operations (which is expensive) in each call. It’s better if you write more elements in a single write. I usually fill a temporary std::array/vector that I send to the FIFO only once at the end of the processBlock and/or as soon it’s filled.

Notice in both cases, you should be using a single FIFO to avoid desync problems with other similar fifos.

2 Likes

Thanks! Yes, so if I understood correctly the issue lies in the fact you can’t easily sync two fifos. Thus, you would use only one fifo, independently of the channel count, with interleaved (channel-wise) or paired samples. You would also write these samples in a vector/buffer in the processBlock method then push them to the fifo once the vector is full to speed up the whole process.

Indeed, that’s the idea!