This needs to be an atomic. Otherwise the compiler is allowed to optimise the entire boolean out of existence or in this case rather the check if the while loop should continue.
Thatâs right, lock free atomic
I have also removed the unnecessary â!â at the loop condition.
Thanks! This is very useful. Is there a way to have a multiple consumers, i.e., a thread pool?
Looking over the code again, I think there is something to be said about the busy loop. Depending on your exact use case, you really shouldnât do it like that. In this case for sample loading, you are preloading the first bit of the sample. If that is 2048 samples big, you can easily cover the first 40ms and therefore put the thread to sleep at least for every 1-5ms.
You read my mind
Yes I had to fix that in production. This is my last version if you are interested.
/*
Copyright 2024 Qfactor
If you change something please add a note here.
Distributed under the MIT license
Example of use:
void processBlock(AudioSampleBuffer& buffer, MidiBuffer& midiMessages)
{
if (getPlayHead() && getPlayHead()->getPosition()
&& getPlayHead()->getPosition()->getTimeInSamples()
&& *(getPlayHead()->getPosition()->getTimeInSamples())==0)
tp.addJob([]{ Logger::outputDebugString("test function"); });
}
OneProducerLockFreeThreadPool<64,100> tp;
Important:
* jobs can only be added from the same thread (one producer)
* increase JOBSIZE if you call addJob with lambdas with big captures and get compile errors
*/
template <int JOBSIZE = 64, int MAX_JOBS_IN_QUEUE = 1000>
class OneProducerLockFreeThreadPool final : public Thread
{
public:
OneProducerLockFreeThreadPool(Thread::Priority threadPriority=Thread::Priority::normal,int timeBetweenCallsInMs=10)
:Thread("")
,timeBetweenCallsInMs(timeBetweenCallsInMs)
{
loopFlag.test_and_set();
startThread(threadPriority);
}
~OneProducerLockFreeThreadPool()
{
loopFlag.clear();
stopThread(timeBetweenCallsInMs*4);
}
void addJob(dsp::FixedSizeFunction<JOBSIZE,void()> &&job)
{
int startIndex1, blockSize1, startIndex2, blockSize2;
abstractFifo.prepareToWrite(1, startIndex1, blockSize1, startIndex2, blockSize2);
if (blockSize1 + blockSize2 == 1)
{
const int writeIndex = (blockSize1 > 0)?startIndex1:startIndex2;
jobs[writeIndex] = std::forward<dsp::FixedSizeFunction<JOBSIZE,void()> >(job);
abstractFifo.finishedWrite(1);
}
else
{
jassertfalse;
//OneProducerLockFreeThreadPool can't hold as many jobs. Increase MAX_JOBS_IN_QUEUE
}
}
void removeAllJobsAndStop()
{
loopFlag.clear();
stopThread(timeBetweenCallsInMs*4);
jobs.clear();
}
private:
void run() override
{
while (loopFlag.test())
{
if (abstractFifo.getNumReady() > 0)
{
int startIndex1, blockSize1, startIndex2, blockSize2;
abstractFifo.prepareToRead(abstractFifo.getNumReady(), startIndex1, blockSize1, startIndex2, blockSize2);
for (int readIndex = startIndex1; loopFlag.test() && readIndex < startIndex1 + blockSize1; readIndex++)
jobs[readIndex]();
for (int readIndex = startIndex2; loopFlag.test() && readIndex < startIndex2 + blockSize2; readIndex++)
jobs[readIndex]();
abstractFifo.finishedRead (blockSize1 + blockSize2);
}
else
{
Thread::sleep(timeBetweenCallsInMs);
}
}
}
using JobVector = std::vector<dsp::FixedSizeFunction<JOBSIZE,void()> >;
AbstractFifo abstractFifo = AbstractFifo(MAX_JOBS_IN_QUEUE);
JobVector jobs = JobVector(MAX_JOBS_IN_QUEUE);
std::atomic_flag loopFlag;
int timeBetweenCallsInMs;
};
Yes, this looks a lot better! However (also to anyone else), make sure to keep on to top of sleepFlag. After a quick read, this should be perfectly working for single consumer, single producer. But should these requirements ever change or you alter the current code in any way, it is easy to produce a case, in which the sleepFlag prevents your thread pool from working a newly added job.
You probably mean multiple producers. There is a way, using the thread id as an index for an array of single producer lock free pools and assuming a fixed maximum amount of coexisting threads.
It could very well be him adding more work threads, meaning more consumers
.
In anway youâll need to move away from AbstractFifo. Here is a ready to use implementation covering all cases: GitHub - hogliux/farbot: FAbian's Realtime Box o' Tricks
There is also a ADC talk on how he came up with this code I think.
I just got rid of the sleepFlag. The sleep call should be enough.
Seconded. Canât speak highly enough about this lib. Covers just about everything you want to do threading communication wise in an audio application.
But in his later talk âReal-time Confessions in C++ - Fabian Renn-Giles - ADC23â he says he would do things different now. It sounds like not all is done correctly in that talk.
About this problem that std::funcion may allocate memory: In lots of places in C++ you can define your own allocator. Isnât that possible here too? In that case you could replace it with a real-time safe allocation.
Doing it differently means there are more options. If it was simply wrong, I bet he would just fix it. (That was my take of it)
Writing your own real-time safe allocator is a tone of work. Going with JUCE FixedSizedFunction makes sure you get a compiler error when you mess up the âno malloc on audio threadâ rule and that is it. It is not supposed to fix the underlying problem of dynamic memory management in the most beautiful way or give you a huge QoL improvement.
