Message passing


#1

Let me air some of my dirty laundry in public … :slight_smile:

We’ve got a largish app now. And occasionally we need some object to tell some other object that some shit has just happened. And there’s usually a cross-thread angle to this as well.

The messages are pretty low bandwidth. But the communication is frequently between different parts of the apps, and it’s inconvenient to set up a pointer from one object to the other. So that rules out the standard listener patterns, or just calling a function.

So I’ve written an object called SimpleMessageManager, possibly not the best choice of name. It works well.

I’m posting the whole thing here in case its useful. But also for a second pair of eyes. Maybe …

  • there is a better way
  • there is a class in JUCE that does this for me that I’ve missed
  • there’s something lightweight already written somewhere else I should have used

I also think that the call to AsyncUpdater is nice and easy to understand but is actually putting a message in the main message queue, which is then used to trigger this message … just feels like duplication.

And my template programming is iffy:

  • Maybe there’s a way of consolidating the SimpleHandlerNoContent and SimpleHandler classes into one.
  • Maybe it can take variable numbers of arguments somehow?

Firstly how does it work? Well here we are handling a message when something has been downloaded:

class X 
{ //...
	SimpleHandler<LcMessage::CloudDataDownloaded> handler1{ 
		context.simpleMessageManager,
	[this](const ItemUuid & uuid)
	{
		if (this->currentItem.isValid() && uuid == this->currentItem.getUuid())
		{
			this->refreshCurrentItem();
		}
	} };
};

I have no idea why we need to put this-> in front of things here but neither VS nor clang likes it without that.

Here’s how the message is defined:

namespace LcMessage
{
	enum Ids
	{
//...

		cloudDataDownloaded,
	};

	typedef SimpleMessageManager::Message<cloudDataDownloaded, ItemUuid> CloudDataDownloaded;
}

And finally here’s how the message is sent:

messageManager.publishAsyncThreadSafe(new LcMessage::CloudDataDownloaded(uuid));

And the glue in the middle:

/**
 *  Allows a thread to send a message to a listener.
 */
class SimpleMessageManager : AsyncUpdater
{
public:
	class MessageBase
	{
	public:
		MessageBase(int messageId) : messageId(messageId) {}
		virtual ~MessageBase() {}
		int getMessageId() const { return messageId; }
	private:
		int messageId;
	};


	class Handler
	{
	public:
		virtual ~Handler(){}
		virtual void handleSimpleMessage(int messageId, MessageBase * data) = 0;
	};

	void subscribe(int messageId, Handler * listener)
	{
		listeners[messageId].add(listener);
	}

	void unsubscribe(Handler * listener)
	{
		for (auto & i : listeners)
			i.second.remove(listener);
	}


	template <int MESSAGE_ID, class T>
	class Message : public MessageBase
	{
	public:
		explicit Message(const T & contents)
			:
			MessageBase(MESSAGE_ID),
			contents(contents)
		{}

		T contents;
		static int constexpr messageIdValue = MESSAGE_ID;
		using ContentType = const T &;
	};

	template <int MESSAGE_ID>
	class MessageNoContent : public MessageBase
	{
	public:
		MessageNoContent() : MessageBase(MESSAGE_ID) {} 
		static int constexpr messageIdValue = MESSAGE_ID;
		using ContentType = void;
	};
	/**
	* The message manager will take ownership of the message object passed in.
	* Handlers will be called on the UI thread.
	*/
	void publishAsyncThreadSafe(MessageBase * message)
	{
		ScopedLock l(lock);
		asyncQueue.push_back(std::unique_ptr<MessageBase>(message));
		triggerAsyncUpdate();
	}
	 
private:
	CriticalSection lock;

	void publish(int messageId, MessageBase * data = nullptr)
	{
		listeners[messageId].call(&Handler::handleSimpleMessage, messageId, data);
	}

	void handleAsyncUpdate() override
	{
		int numItems;

		{
			ScopedLock l(lock);
			numItems = asyncQueue.size();
		}

		// we do this (rather than looping until the queue is empty) 
		// to avoid a deadlock where a handler keeps creating a new message

		for (int i = 0; i < numItems; ++i) 
		{
			std::unique_ptr<MessageBase> message;

			{
				ScopedLock l(lock);
				message = std::move(asyncQueue.front());
				asyncQueue.pop_front();
			}

			publish(message->getMessageId(), message.get());
		}
	}

	std::deque<std::unique_ptr<MessageBase>> asyncQueue;
	std::map<int, ListenerList<Handler>> listeners;
};


template <class MessageType>
class SimpleHandler : public SimpleMessageManager::Handler
{
public:
	using T = typename MessageType::ContentType;
	SimpleHandler(SimpleMessageManager & messageManager, std::function<void(T)> action) : messageManager(messageManager), action(action) {
		messageManager.subscribe(MessageType::messageIdValue, this);
	}

	~SimpleHandler() { messageManager.unsubscribe(this); }

	void handleSimpleMessage (int, SimpleMessageManager::MessageBase* data) override
	{
		MessageType * m = static_cast<MessageType*>(data);
		action(m->contents);
	}

private:
	SimpleMessageManager & messageManager;
	std::function<void(T)> action;
};


template <class MessageType>
class SimpleHandlerNoContent : public SimpleMessageManager::Handler
{
public:
	SimpleHandlerNoContent(SimpleMessageManager & messageManager, std::function<void()> action) : messageManager(messageManager), action(action) {
		messageManager.subscribe(MessageType::messageIdValue, this);
	}

	~SimpleHandlerNoContent() { messageManager.unsubscribe(this); }

	void handleSimpleMessage (int, SimpleMessageManager::MessageBase*) override
	{
		action();
	}

private:
	SimpleMessageManager & messageManager;
	std::function<void()> action;
};