One-shot threads (lambdas on their own thread that auto-delete)

His solution’s good, but for me to add it to the Thread class would mean doing it without the juce_events module, so would need to be a bit different…

cool. hopefully we’ll see your solution sooner rather than later!

I’ve been trying to add a means to queue these OneShotThreads up, and am having an issue where my OneShotThread is calling sendChangeMessage() when it quits, inserting it into the original changeListenerCallback:

class OneShotThread : public ChangeListener
{
public:
 
OneShotThread (std::function<bool()> task, )
{
    m_pAutoDeleteThread = new AutoDeleteThreadActual (this, task);
    m_pAutoDeleteThread->startThread();
}
 
void changeListenerCallback (ChangeBroadcaster* source) override
{
    if (source == m_pAutoDeleteThread)
        delete this;
}

becomes:

class OneShotThread : public ChangeListener, public ChangeBroadcaster
{
public:
 
OneShotThread (std::function<bool()> task, ChangeListener* whoToNotifyWhenFinished)
{
    addChangeListener(whoToNotifyWhenFinished);
    m_pAutoDeleteThread = new AutoDeleteThreadActual (this, task);
    m_pAutoDeleteThread->startThread();
}
 
void changeListenerCallback (ChangeBroadcaster* source) override
{
    if (source == m_pAutoDeleteThread) {
        sendChangeMessage(); //notify whoToNotifyWhenFinished
        delete this;
    }
}

It seems like that sendChangeMessage never reaches a destination tho. Any ideas?
My end goal is to be able to queue up a bunch of these OneShotThread lambdas in a row, and none of them fire until the previous one finishes running.

Firstly, please check the code I have on DropBox where I added a removeChangeListener() call in the actual thread destructor which isn’t in the code I posted here.

Secondly – for what you’re trying to do I personally would create an Array of tasks and use it as a FIFO to perform the tasks one after the other… removing them from the Array until the Array is empty. I’m swamped today so don’t have time unfortunately to suggest any actual code… but I use a similar approach in my plugin to schedule my sample loading… you simply need to add a task to the Array in OneShotThread and the class will start performing them.

Hope that makes sense.

Cheers,

Rail

I ended up changing the constructor on your class to accept a 2nd task that is executed before the changeListenerCallback()'s delete this is called.

OneShotThread::OneShotThread( std::function<bool()> task, std::function<void()> _finishedTask )
{
    finishedTask = _finishedTask;
    m_pOneShotThreadHelper = new OneShotThreadHelper(this, task);
    m_pOneShotThreadHelper->startThread();

}
void OneShotThread::changeListenerCallback(juce::ChangeBroadcaster *source) {
    if( source == m_pOneShotThreadHelper) {
        finishedTask();
        delete this;
    }
}

and in that task, i just use a version of @jules’ Timer::CallAfterDelay() object to signal the queue to advance. I’m pretty happy with it.

class DelayedOneShotLambda : private juce::Timer {
public:
    DelayedOneShotLambda( int milliseconds, std::function<void()> f ) {
        function = f;
        startTimer(milliseconds);
    }
    void timerCallback() override {
        auto f = function;
        delete this;
        f();
        }
private:
    std::function<void()> function;
};

OneShotQueue header:

class OneShotQueue {
public:
    static OneShotQueue* GetInstance();
    static void AddToQueue( std::function<bool()> lambda );
    static void Begin();
    void queueLambdaFinished();
private:
    void serviceQueue();
    bool beingServiced = false;
    juce::int64 time;
    OneShotQueue();
    ~OneShotQueue() { }
    static OneShotQueue* instance;
    std::queue< std::function<bool()> > queue;
};

Implementation:

OneShotQueue::OneShotQueue() {
    queue = {};
    time = juce::Time::currentTimeMillis();
}
OneShotQueue* OneShotQueue::instance = nullptr;
OneShotQueue* OneShotQueue::GetInstance() {
    if( instance == nullptr ) {
        instance = new OneShotQueue();
    }
    return instance;
}

void OneShotQueue::Begin() {
    auto instance = OneShotQueue::GetInstance();
    instance->serviceQueue();
}

void OneShotQueue::AddToQueue(std::function<bool()> lambda ) {
    auto instance = OneShotQueue::GetInstance();
    instance->queue.push(lambda);
    instance->serviceQueue();
}

void OneShotQueue::serviceQueue() {
    if( queue.size() > 0 && !beingServiced ) {
        beingServiced = true;
        auto sinceLastService = juce::Time::currentTimeMillis() - time;
        time = juce::Time::currentTimeMillis();
        DBG( "Servicing OneShotQueue " + juce::String(sinceLastService) + "ms since last service" );
        std::function<bool()> lambda = queue.front();
        queue.pop();
        new OneShotThread( lambda, [this]() { this->queueLambdaFinished(); } );
    } else {
        DBG( "checking if queue needs servicing...." );
        new DelayedOneShotLambda( 100, [this](){ this->serviceQueue(); } ); //check in every 100ms to see if there is anything in the queue that needs running.
    }
}

void OneShotQueue::queueLambdaFinished() {
    //source was probably deleted by now.
    beingServiced = false;
    new DelayedOneShotLambda( 100, [this](){ this->serviceQueue(); } );
} 

usage:

auto a = []() { DBG( "Testing OneShotQueue" ); return true; };
OneShotQueue::AddToQueue(a);
OneShotQueue::AddToQueue(a);
OneShotQueue::Begin();
OneShotQueue::AddToQueue(a);
OneShotQueue::AddToQueue(a);

it works reaaaaaally well for my app-server communications. I’m sure it could be made more generic for other folks’ usage, but it’s perfect for my needs.

Have you looked into thread-pools/job systems? At a glance it seems like you’re reeinventing the wheel (and you usually want to avoid that for multithreaded systems). Also, if all you want to do is call a lambda on a thread, std::async does exactly this.

Unfortunately std::async returns a std::future which has to be stashed somewhere and that can at times be quite inconvenient to do.

Any ideas how to deal with the OneShotThread if it’s mid-execution when the app gets shut down?

Well do any of the tasks have a mechanism to be told to quit/abort? If not you have to send the thread a signal to quit then spin until it does or times out which is what stopThread does. Obviously you should design your code so your thread tasks end cleanly. If you were using a FIFO array to hold the queue you could simply abort after the current task.

Rail

I guess I meant more the lambda being executed in the thread. how do you exit out of that prematurely, i mean. tho, it seems more that I just need to check if ‘this’ still exists before using it inside the lambda.

How about if your lambda gets a reference to a boolean variable which corresponds to the thread’s (signalThreadShouldExit) flag that the lambda checks frequently?

Rail

This is actually the big flaw with lambda threads, and one of the main reasons I didn’t jump at adding them earlier. It’s actually pretty rare that you have a task which takes so long that you need it to go on a background thread, but where you don’t need a mechanism for allowing it to be interrupted.

I will add a lambda for this, because it’s very easy to do, but with a huge caveat. Because there’s a Thread object running it, a lambda can always call Thread::getCurrentThread()->shouldThreadExit(), but because these are anonymous, no other threads will have a reference to that thread to allow it to be told to stop!

A more useful use-case for lambdas is in the ThreadPool, where normally at shutdown you’d at least tell the pool to wait for its jobs to finish before killing it.

2 Likes

perhaps this OneShotThread needs to be modified so it occurs in a ThreadPool as a ThreadPoolJob?

It’s actually pretty rare that you have a task which takes so long that you need it to go on a background thread, but where you don’t need a mechanism for allowing it to be interrupted.

For my use case, i’m using these threads to send messages to my server and wait for the reply. the server’s reply can be slow at times depending on the process. When i’ve added debug statements that calculate the round-trip time, it’s between 200-300ms when using a virtualbox running on the same machine.

Did you check out the InterProcessConnection?

I use it to communicate asynchronously to an electron app, so I implemented the other side in node.js.
You only need to write a proper header (4 byte magic word plus 4 byte packet size).

@daniel If InterProcessConnection needs to talk with an InterProcessConnectionServer, how did you set up your node.js app? I haven’t found any examples for how to use InterprocessConnection with a webserver.

It talks to a raw TcpSocket. In our case the backend (juce application without GUI) runs an InterProcessServer, and the node application connects a tcp socket to it.

So in your OP it was not clear, if your Webserver needs to talk HTTP or if it can be anything?
If it needs to be HTTP, then it makes probably no sense to use that class… but for other asynchronous communication it might be worth a look.

yeah, it needs to communicate via http, as i use a shared host currently. It’s looking like I need to use juce::StreamingSocket, as those connections can be cancelled. perhaps @jules can weigh in on which method of communicating via http is the best choice when you need to cancel it.

Ah ok, pity, in that case forget my post :wink:

Perhaps this will help:

http://stackoverflow.com/questions/4456180/how-can-i-immediately-cancel-a-curl-operation

Cheers,

Rail

A more useful use-case for lambdas is in the ThreadPool, where normally at shutdown you’d at least tell the pool to wait for its jobs to finish before killing it.

@jules @Rail_Jon_Rogut could you possibly provide an example showing how to implement this? I’m revisiting this part of my project, trying to figure out how to deal with these OneShotThreads that are still running when systemRequestedQuit() is called.