Discuss: CircularSingleWriterSingleReaderLockFreePointerFIFO

Please Discuss:
I missed a simple fifo to pass pointers between threads, when you now there is only one reader and one writer.
I didn’t test this template very much, i used AbstractFifo as a base. // EDIT: i the mean time I’ve done a lot of more testing
Jules, would be cool if you could add something like this to the source-tree.

//EDIT
A simple example (Pseudocode), the templates can be used to create a filter-kernel on a different thread, and push it into the audio-thread
The process-Block always use the latest kernel, and if there is a newer kernel, it will pushed back to the producer thread.
There will be no locks, memory allocation/free, on the AudioThread

//Pseudo Code Demo

ConsumerProducerManager q;

//Thread 1 Producer

{ while (true) { q.producerSendObjectToConsumerOrDeleteIt(new Object) // push new Object q.producerDeleteBackItems() //Delete Objects that came back from the Consumer thread }; };

// Thread2 Consumer (Audio Thread)

{ q.consumerSwitchCurrentToLatestObject() // we only want the latest object, which was generated while (true) { if (q.consumerIsCurrentObjectAvailable()) { q.consumerGetCurrentObject()->doSomeThing() } }; };

// EDIT , replaced with the newest Version

template <class ObjectClass>
class    CircularSingleWriterSingleReaderLockFreePointerFIFO
{
public:
   
    CircularSingleWriterSingleReaderLockFreePointerFIFO (int capacity) noexcept
   {
      jassert (capacity > 0);
      arrayOfPointers.resize(capacity);
   }
  
   ~CircularSingleWriterSingleReaderLockFreePointerFIFO() {};
   
   int getTotalSize() const noexcept           { return arrayOfPointers.size(); }
   int getFreeSpace() const noexcept           { return arrayOfPointers.size() - getNumReady(); }
   int getNumReady() const noexcept
   {
      const int vs = validStart.get();
      const int ve = validEnd.get();
      return ve >= vs ? (ve - vs) : (arrayOfPointers.size() - (vs - ve));
   }

   void reset() noexcept
   {
      validEnd = 0;
      validStart = 0;
   }

   void setTotalSize (int newSize) noexcept
   {
      jassert (newSize > 0);
      reset();
      arrayOfPointers.resize(newSize);
   }

   //==============================================================================
   // adds a Pointer to the FIFO, return true when successful
   bool write ( ObjectClass* object)  noexcept
   {
      const int vs = validStart.get();
      const int ve = validEnd.value;
      const int freeSpace = ve >= vs ? (arrayOfPointers.size() - (ve - vs)) : (vs - ve);


      if (freeSpace <=1 )
      {
         return false;
      }
      
      arrayOfPointers.set(ve,object);
      int newEnd = ve + 1;
      if (newEnd >= arrayOfPointers.size()) newEnd=0;
      validEnd.set(newEnd); // ATOMIC at the end
      return true;
   }

   // reads a pointer from the FIFO, returns the pointer or 0 if not successful
   ObjectClass* read ()  noexcept
   {
      const int vs = validStart.value;
      const int ve = validEnd.get();
      const int numReady = ve >= vs ? (ve - vs) : ( arrayOfPointers.size() - (vs - ve));
   

      if (numReady <= 0)
      {
         return 0;
      }
            
      
      int newStart = vs + 1;
      if (newStart >= arrayOfPointers.size())    newStart = 0;
      validStart.set(newStart); // ATOMIC at the end
      return arrayOfPointers[vs];

   }

private:
    //==============================================================================
   
    Atomic <int> validStart, validEnd;
   Array <ObjectClass*>  arrayOfPointers;

    JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (CircularSingleWriterSingleReaderLockFreePointerFIFO);
};

template <class ObjectClass>
class ConsumerProducerManager
{
public:
   ConsumerProducerManager(int capacity)
      :producerToConsumer(capacity), consumerToProducer(capacity), currentObject(nullptr)
   {

   };

   ~ConsumerProducerManager()
   {
      //This is a fallback if not all objects were consumed/deleted
      producerDeleteBackItems();
      deleteUnusedItems();
      if (currentObject!=nullptr)
      {
         delete currentObject;
      }
   }

   // ONLY USED BY PRODUCER !!!
   void producerDeleteBackItems()
   {
      ObjectClass* next=consumerToProducer.read();
      while (next!=0)
      {
         delete next;
         next=consumerToProducer.read();
      }
   };

   void deleteUnusedItems()
   {
      ObjectClass* next=producerToConsumer.read();
      while (next!=0)
      {
         delete next;
         next=producerToConsumer.read();
      }
   };

   // ONLY USED BY PRODUCER !!!
   void producerSendObjectToConsumerOrDeleteIt(ObjectClass* newObject)
   {
      if (!producerToConsumer.write(newObject))
      {
         // if queue is full, fallback!
         jassertfalse;
         delete newObject;
      }
   };

   // ONLY USED BY CONSUMER
   bool consumerIsCurrentObjectAvailable()
   {
      return currentObject!=nullptr;
   };

   // ONLY USED BY CONSUMER
   ObjectClass* consumerGetCurrentObject()
   {
      return currentObject;
   }

   // ONLY USED BY CONSUMER
   void consumerSwitchCurrentToLatestObject()
   {
      ObjectClass* next=producerToConsumer.read();
      while (next!=0)
      {
      
         if (currentObject!=nullptr)
         {
            if (!consumerToProducer.write(currentObject))
            {
               //if the queue is full, delete it here, fallback
               delete currentObject;
               jassertfalse;
            }
         }

         currentObject=next;
         next=producerToConsumer.read();
      }
   };


private:

   CircularSingleWriterSingleReaderLockFreePointerFIFO<ObjectClass> producerToConsumer;
   CircularSingleWriterSingleReaderLockFreePointerFIFO<ObjectClass> consumerToProducer;

   ObjectClass* currentObject;

};

VFLib already has robust facilities for passing data between threads, and there’s no limitation on the size. It also supports multiple writers (but single reader).

Why wouldn’t you just use AbstractFIFO and it’s method as intended? LockFree cross-thread programming is a real minefield, and rolling your own from a copy/paste seems risky. If one operation is out of order, or even if something is a direct set instead of a method (which changes MemoryBarriers) it will fail in mysterious ways.

The best practice is to have a solid building block and use it to make other pieces - such as using a single writer/single reader queue and use it to set and get one item at a time.

On another note, you’re using an array and set and get etc. They do memory moves and all sorts. I’d be surprised if your class works - you need an exactly fixed array (not a juce array class, a C array) and let the pointers manage what is filled and what isn’t.

Bruce

For instance, in a class I have, here’s the way to add one object:

[code]void TimeStateTracker::enqTimeMessage (TimeStateMessage inMsg)
{
#if JUCE_DEBUG
if (!feedThread)
feedThread = Thread::getCurrentThread ();
else if (feedThread != Thread::getCurrentThread())
{
//jassertfalse;
}
#endif

int written					= 0;

int start1, size1, start2, size2;
prepareToWrite (1, start1, size1, start2, size2);

if (size1 > 0)
{
	timesBuffer [start1]	= inMsg;
	written					= 1;
}
else if (size2 > 0)
{
	timesBuffer [start2]	= inMsg;
	written					= 1;
}

//jassert (written > 0);

finishedWrite (written);

}[/code]
On the other thread, anything queued gets moved to a more manageable juce array:

[code] int start1, size1, start2, size2;
prepareToRead (maxMsgs, start1, size1, start2, size2);

int msg		= 0;
while (msg < size1)
{
	incomingTimes.add (timesBuffer[start1 + msg++]);
}

msg		= 0;
while (msg < size2)
{
	incomingTimes.add (timesBuffer[start2 + msg++]);
}

finishedRead (size1 + size2);

jassert (size1 + size2 < maxMsgs);

[/code]

Where - this the key:

The class/struct going into the array must be copyable by value, as I recall. Use a ReferenceCountedObject pointer for variable storage. I’d advise against passing raw pointers in general. If something goes wrong on either side, you’ll leak memory.

It looks like my class above wouldn’t support refcounted data payloads, by the way: it doesn’t clear the array members, so it would hold arbitrary references for a random amount of time.

Bruce

You know what else would be really cool?

If Jules implemented the fast wait-free data structure that Kogan and Petrank published this year.
ACM: http://dl.acm.org/citation.cfm?doid=2145816.2145835
PDF: http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf

That would be cool. Maybe one day…

[quote=“sonic59”]You know what else would be really cool?

If Jules implemented the fast wait-free data structure that Kogan and Petrank published this year.
ACM: http://dl.acm.org/citation.cfm?doid=2145816.2145835
PDF: http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf

That would be cool. Maybe one day…[/quote]

No, it wouldnt’ be cool at all. First of all, 64-bit CAS is not universally available. Second, the use of hazard pointers complicates the client code. There are lots of wait-free algorithms but almost none that transparently eliminate the ABA problem for all cases.

Shameless plug: VFLib implementation of CallQueue and Listeners eliminates the ABA problem transparently, and works on all platforms.

is it entire lock-free? Remember i only need it in a single writer / single reader situation. I noticed that heave memory allocation and freeing can introduce heavy waits on CriticalSections (on entry, even when they are !not! locked!!!)

[quote=“Bruce Wheaton”]Why wouldn’t you just use AbstractFIFO and it’s method as intended? LockFree cross-thread programming is a real minefield, and rolling your own from a copy/paste seems risky. If one operation is out of order, or even if something is a direct set instead of a method (which changes MemoryBarriers) it will fail in mysterious ways.

The best practice is to have a solid building block and use it to make other pieces - such as using a single writer/single reader queue and use it to set and get one item at a time.
[/quote]

Yes thats what i want, a solid building block for passing (fixed size) Objects between threads. (Thats why i’m posting here) AbstractFifo functionality does not match exactly what need, of cause i could do it with, but i need to implement it every time, and its useful for objects with different size lengths which i don’t need here.
I don’t want to write

[code] prepareToWrite (1, start1, size1, start2, size2);

if (size1 > 0)
{
timesBuffer [start1] = inMsg;
written = 1;
}
else if (size2 > 0)
{
timesBuffer [start2] = inMsg;
written = 1;
}

//jassert (written > 0);

finishedWrite (written);[/code]
instead I wanna write

write(inMsg)

Using smart-Pointers is also a good idea, i took the array, because i wanted to control where the object are freed, but as a fallback thats a good idea.

mmh really, i resize the array with the maximum size at the beginning, but i will have a look

mmh really, i resize the array with the maximum size at the beginning, but i will have a look[/quote]

I think that just sets the maximum size of the array, but where your data is will be shifting around (and the used size). A Circular Buffer requires fixed storage and the pointers to head and tail to chase around.

Bruce

I will have a look tomorrow :) But when i remember set will just =operator the item, there is no memory shifting around ...


@all
my initial intention for this thread was,to have a robust fifo/queue which is used by [b]one [/b]writer and [b]one [/b]reader with zero locks to pass pointers/smartpointers between threads, exactly that an nothing else ;-)

if you need something different, or have general ideas about concurrent programming, implementations or whatever else feel free to create a new thread  :)

I will have a look tomorrow :slight_smile: But when i remember set will just =operator the item, there is no memory shifting around …

@all
my initial intention for this thread was,to have a robust fifo/queue which is used by one writer and one reader with zero locks to pass pointers/smartpointers between threads, exactly that an nothing else :wink:

if you need something different, or have general ideas about concurrent programming, implementations or whatever else feel free to create a new thread :slight_smile:

is it entire lock-free? Remember i only need it in a single writer / single reader situation. I noticed that heave memory allocation and freeing can introduce heavy waits on CriticalSections (on entry, even when they are !not! locked!!!)[/quote]

Of course it’s lock-free, and sometimes wait-free. You might only need single writer, but you get multiple writer capability with no extra computational burden. That is to say, that if you have only a single writer with vf::CallQueue there is no wait state.

With vf::CallQueue you get to pass whatever you want. There is no limitation on size or the number of objects that you pass. The syntax is completely clear, since it looks just like a function call. Except that the function executes on another thread:

For example, let’s say you want to call triggerAsyncUpdate() on another thread. This is the syntax:

m_thread.call (&AsyncUpdater::triggerAsyncUpdate, this);

A more detailed example. Given:

struct Consumer
{
  void doWork (ReferenceCountedObjectPtr <Work> work);
};

This code will pass a Work object to the Consumer thread:

vf::ThreadWithCallQueue m_thread;
ReferenceCountedObjectPtr <Work> work (new Work);
Consumer consumer;

m_thread.call (&Consumer::doWork, &consumer, work);

You can pass multiple arguments in to call() and they can be of any time (except C++ references, unless you use a boost style reference wrapper).

1 Like

Vinn, do you use CriticalSections?

It has to be wait free!!!

https://github.com/vinniefalco/VFLib/blob/master/modules/vf_core/containers/vf_LockFreeQueue.h

https://github.com/vinniefalco/VFLib/blob/master/modules/vf_concurrent/memory/vf_FifoFreeStoreWithoutTLS.cpp

I can’t say that my code is completely wait free in all situations. Every truly wait-free data structure that I have ever seen comes with some kind of restrictions. For example, a predefined upper limit on the size of the queue. Or the possibility that an operation can “fail” (i.e. the queue is “busy”). But I know that this code works well even when called from the audioDeviceIoCallback (which is why it was designed).

Furthermore, the function call paradigm is very natural and fits into existing code easily.

[quote=“chkn”]my initial intention for this thread was,to have a robust fifo/queue which is used by one writer and one reader with zero locks to pass pointers/smartpointers between threads, exactly that an nothing else :wink:

if you need something different, or have general ideas about concurrent programming, implementations or whatever else feel free to create a new thread :)[/quote]

It’s a good need, yes. But not as trivial a topic as you may think. For instance - wait-free is not the same as lock-free, and getting both is very hard. Minefield, like I say.

I’d suggest you make a single object cross-thread queue using an AbstractFIFO (not trying to duplicate it, since then it will require maintenance and extensive edge case and load testing), and submit that to Jules or in the useful components forum.

Or, try ThreadQueue. But your requirements are a bit unreasonable, and you may need to work out what your app/s really need, since perfection doesn’t exist in this area.

Remember - even your FIFO implementation is subject to when each thread runs and does its work. If your worker thread never gets scheduled, it will never pick up its new data, and vice versa. I’d say settle for lock-free, and be happy Jules did that. It was pretty grim before then - lots of careful design to try to avoid interlocks.

Bruce

What I find is that rarely do you ever really want just a pure FIFO. You also want some kind of signaling mechanism so the other thread can “wake up” when there’s data to be processed. And of course, some memory management to make things easier.

Looking at AbstractFifo there is all sorts of nonsense to deal with, like the buffer “filling up”, or determining the amount of “ready” items. And it doesn’t provide any signaling or storage!

This is why I wrote the ThreadWithCallQueue, and Listeners. They provide the full set of features that are needed for communicating across threads. Instead of shirking the responsibility of doing the signaling, managing memory, and passing of objects (whether they are passed by value or reference counted heap objects) it tackles it head on and does it all for you.

Cause that’s the only thing what i need! I have existing thread which is producing a objects, which will transferred to the audio-thread. After they are consumed, they are pushed back on a second fifo, to free them on the non critical thread.
I don’t need signaling, I don’t need storage-managing, I don’t need filling up buffers in various length, etc… (and yes i know signaling/storage-managing is useful!), i want to keep things simple as possible.

I’ve made a second template, which uses two CirculalFIFOs for the behavior i need, there is an emergency fallback, if the queue is full, additional objects will be deleted, thats just the specific behavior i need here, works surprisingly good, feel free to discuss, thanks!

//Pseudo Code Demo

ConsumerProducerManager q;

//Thread 1 Producer

{ while (true) { q.producerSendObjectToConsumerOrDeleteIt(new Object) // push new Object q.producerDeleteBackItems() //Delete Objects that came back from the Consumer thread }; };

// Thread2 Consumer (Audio Thread)

{ q.consumerSwitchCurrentToLatestObject() // we only want the latest object, which was generated while (true) { if (q.consumerIsCurrentObjectAvailable()) { q.consumerGetCurrentObject()->doSomeThing() } }; };

New Versions of the two classes ConsumerProducerManager and CircularSingleWriterSingleReaderLockFreePointerFIFO



template <class ObjectClass>
class    CircularSingleWriterSingleReaderLockFreePointerFIFO
{
public:
   
    CircularSingleWriterSingleReaderLockFreePointerFIFO (int capacity) noexcept
	{
		jassert (capacity > 0);
		arrayOfPointers.resize(capacity);
	}
  
	~CircularSingleWriterSingleReaderLockFreePointerFIFO() {};
   
	int getTotalSize() const noexcept           { return arrayOfPointers.size(); }
	int getFreeSpace() const noexcept           { return arrayOfPointers.size() - getNumReady(); }
	int getNumReady() const noexcept
	{
		const int vs = validStart.get();
		const int ve = validEnd.get();
		return ve >= vs ? (ve - vs) : (arrayOfPointers.size() - (vs - ve));
	}

	void reset() noexcept
	{
		validEnd = 0;
		validStart = 0;
	}

	void setTotalSize (int newSize) noexcept
	{
		jassert (newSize > 0);
		reset();
		arrayOfPointers.resize(newSize);
	}

	//==============================================================================
	// adds a Pointer to the FIFO, return true when successful
	bool write ( ObjectClass* object)  noexcept
	{
		const int vs = validStart.get();
		const int ve = validEnd.value;
		const int freeSpace = ve >= vs ? (arrayOfPointers.size() - (ve - vs)) : (vs - ve);


		if (freeSpace <= 1)
		{
			return false;
		}
		
		arrayOfPointers.set(ve,object);
		int newEnd = ve + 1;
		if (newEnd >= arrayOfPointers.size()) newEnd=0;
		validEnd.set(newEnd); // ATOMIC at the end
		return true;
	}

	// reads a pointer from the FIFO, returns the pointer or 0 if not successful
	ObjectClass* read ()  noexcept
	{
		const int vs = validStart.value;
		const int ve = validEnd.get();
		const int numReady = ve >= vs ? (ve - vs) : ( arrayOfPointers.size() - (vs - ve));
	

		if (numReady <= 0)
		{
			return 0;
		}
				
		
		int newStart = vs + 1;
		if (newStart >= arrayOfPointers.size()) 	newStart = 0;
		validStart.set(newStart); // ATOMIC at the end
		return arrayOfPointers[vs];

	}

private:
    //==============================================================================
   
    Atomic <int> validStart, validEnd;
	Array <ObjectClass*>  arrayOfPointers;

    JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (CircularSingleWriterSingleReaderLockFreePointerFIFO);
};

template <class ObjectClass>
class ConsumerProducerManager
{
public:
	ConsumerProducerManager(int capacity)
		:producerToConsumer(capacity), consumerToProducer(capacity), currentObject(nullptr)
	{

	};

	~ConsumerProducerManager()
	{
		//This is a fallback if not all objects were consumed/deleted
		producerDeleteBackItems();
		deleteUnusedItems();
		if (currentObject!=nullptr)
		{
			delete currentObject;
		}
	}

	// ONLY USED BY PRODUCER !!!
	void producerDeleteBackItems()
	{
		ObjectClass* next=consumerToProducer.read();
		while (next!=0)
		{
			delete next;
			next=consumerToProducer.read();
		}
	};

	void deleteUnusedItems()
	{
		ObjectClass* next=producerToConsumer.read();
		while (next!=0)
		{
			delete next;
			next=producerToConsumer.read();
		}
	};

	// ONLY USED BY PRODUCER !!!
	void producerSendObjectToConsumerOrDeleteIt(ObjectClass* newObject)
	{
		if (!producerToConsumer.write(newObject))
		{
			// if queue is full, fallback!
			jassertfalse;
			delete newObject;
		}
	};

	// ONLY USED BY CONSUMER
	bool consumerIsCurrentObjectAvailable()
	{
		return currentObject!=nullptr;
	};

	// ONLY USED BY CONSUMER
	ObjectClass* consumerGetCurrentObject()
	{
		return currentObject;
	}

	// ONLY USED BY CONSUMER
	void consumerSwitchCurrentToLatestObject()
	{
		ObjectClass* next=producerToConsumer.read();
		while (next!=0)
		{
		
			if (currentObject!=nullptr)
			{
				if (!consumerToProducer.write(currentObject))
				{
					//if the queue is full, delete it here, fallback
					delete currentObject;
					jassertfalse;
				}
			}

			currentObject=next;
			next=producerToConsumer.read();
		}
	};


private:

	CircularSingleWriterSingleReaderLockFreePointerFIFO<ObjectClass> producerToConsumer;
	CircularSingleWriterSingleReaderLockFreePointerFIFO<ObjectClass> consumerToProducer;

	ObjectClass* currentObject;

};

[quote=“chkn”]Cause that’s the only thing what i need! I have existing thread which is producing a objects, which will transferred to the audio-thread. After they are consumed, they are pushed back on a second fifo, to free them on the non critical thread.
I don’t need signaling, I don’t need storage-managing, I don’t need filling up buffers in various length, etc… (and yes i know signaling/storage-managing is useful!), i want to keep things simple as possible.[/quote]

Exactly. Note that your “simple” requirements include all the things that I mentioned.

I’m not sure where to begin. It is lock free, but there’s essentially no substitute for the lock being gone. It’s going to leak objects like a sieve, and crash quite often.

Here’s a really easy start: you’re accessing your member:

Array <ObjectClass*>  arrayOfPointers;

From two threads. Right there - not possible, safe, or wise (that’s just a start). I understand you want a class to do what you want, but you’ve got two perfectly good alternatives.

Writing this sort of class is reserved for the top tier of programmers, who understand memory barriers, what a compiler is going to do with specific code, race conditions etc. It’s really not something you can make up off the top of your head using high level classes that aren’t thread safe.

Honestly.

Bruce

Edit: So, your trick of making a fixed size array seems OK, if risky - why use a class if you’re relying on unguaranteed behavior to make it act like a simple array. I couldn’t make a double delete happen, or an attempt to action something already deleted. Making thousands of objects leak was trivial.

[quote] It is lock free, but there’s essentially no substitute for the lock being gone. It’s going to leak objects like a sieve, and crash quite often.
[/quote]
The Circular-Buffer works exact the way in AbstractFifo, do you notice the atomic indexes to the start end the end?
And about the array, i’m just using array template, its just a more comfortable version of a dynamic array with “new”, there is really nothing fancy about that. (i can also use setUnchecked, to make it more similar to an pure c++ dynamic array)

that’s not may way of thinking, … there is no magic involved (believe me!), its just logic.
I’m not saying my code is perfect, if you find a possible problem, please tell me, that why i posted it here, to have a reliable version, and for other people who need a similar solution, but such general statements are IMHO not helpful.
I can’t follow what you say about leakage…

Maybe i didn’t declare enough for what my code is intended for. The FIFO is just a tool for passing pointer between threads (there is no memory-management involved). The second class ConsumerProducerManager, “owns” the pointer you giving it, so that the Consumer “Audio” thread has a reliable version of the object you have generated in the producer thread, where do you see the potential leakage?
(There is a fallback mechanism, if the queue is full, the objects will be deleted, but that’s just the behavior i need here)