Stopping or interrupting 'URL::createInputStream()"?

Hi, are we able to stop http requests that are created via URL::createInputStream()?

these are normally created with this simple approach.

URL url("www.google.com");
if( auto stream = url.createInputStream(true, nullptr, nullptr, {}, -1) )
{
    auto str = stream->readEntireStreamAsString();  
}

This was reported here by @lalala way back in 2016, but I haven’t seen anything since then that lets us stop one of these midway through the connection process.

If I quit my app while this function is being called on a background thread, I end up with this blast of leaked objects:

JUCE Assertion failure in juce_Thread.cpp:233
!! killing thread by force !!
stopping ServerReplyThread
*** Leaked objects detected: 1 instance(s) of class URLConnectionState
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 1 instance(s) of class WebInputStream
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 1 instance(s) of class Pimpl
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 1 instance(s) of class URL
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 1 instance(s) of class InputStream
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 2 instance(s) of class MemoryBlock
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 1 instance(s) of class Thread
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 2 instance(s) of class WaitableEvent
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 1 instance(s) of class StringPairArray
JUCE Assertion failure in juce_LeakedObjectDetector.h:90
*** Leaked objects detected: 4 instance(s) of class StringArray
JUCE Assertion failure in juce_LeakedObjectDetector.h:90

the bug I reported at the time was about the returned value of the callback that wasn’t taken into account, but that was finally fixed.

I’m using it like that more or less :

struct ChangelogDownloader : private Thread
{
    ChangelogDownloader() = default;

    ~ChangelogDownloader()
    {
        stopThread (10000);
    }

    void downloadChangelog()
    {
        startThread();
    }

    void run() override
    {
        const std::unique_ptr<InputStream> in (changeLogURL.createInputStream (false, openStreamProgressCallback, this, String(), 4000));

        if (threadShouldExit() || in == nullptr)
            return;

        changeLog = in->readEntireStreamAsString();
    }

    static bool openStreamProgressCallback (void* context, int /*bytesSent*/, int /*totalBytes*/)
    {
        auto thread = static_cast<Thread*> (context);
        return ! thread->threadShouldExit();
    }

    String changeLog;
};

I think what we need is a way to access the thread that actually runs when we createInputStream:

    bool start (WebInputStream& inputStream, WebInputStream::Listener* listener)
    {
        startThread();

        while (isThreadRunning() && ! initialised)
        {
            if (listener != nullptr)
                if (! listener->postDataSendProgress (inputStream, latestTotalBytes, (int) [[request HTTPBody] length]))
                    return false;

            Thread::sleep (1);
        }

        return connection != nil && ! hasFailed;
    }

///snip
    void run() override
    {
        {
            const ScopedLock lock (createConnectionLock);

            if (hasBeenCancelled)
                return;

            connection = [[NSURLConnection alloc] initWithRequest: request
                                                         delegate: delegate];
        }

        while (! threadShouldExit())
        {
            JUCE_AUTORELEASEPOOL
            {
                [[NSRunLoop currentRunLoop] runUntilDate: [NSDate dateWithTimeIntervalSinceNow: 0.01]];
            }
        }
    }

But i’m not sure if these OS X classes allow connections to be cancelled. And I have no idea how this works on windows.

These snippets come from class WebInputStream::Pimpl::connect which is called from WebInputStream::connect

that’s why I’m passing the thread as 1rst parameter (this) to my OpenStreamProgressCallback.

I’m going to have to study what you’ve got going in here. This seems rather clever…

If I were to tackle downloads (and I have a project coming up about that), I would definitely liking to use a ThreadPool and ThreadPoolJobs, that way it is all cleaned up nicely and you can define easily, how many downloads you want to allow in parallel.
Btw. you can have several ThreadPools and use one only for downloads, if you want finer control that it won’t interfere with other background tasks.

I’m not tackling downloads. I’m tackling quitting the app in the middle of a call to createInputStream which hasn’t closed the connection.

Well yes, that’s the symptom. What I meant is, I wouldn’t use the method to create self owned threads, but let a managed object like ThreadPool manage the thread, that is blocking your quit request.

The solution I would use is basically the ChangelogDownloader @lalala posted above but with a ThreadPool. But you can choose your own way.

Are you blocked in createInputStream() or on the subsequent readEntireStreamAsString()?

createInputStream has no means of interruption except for @lalala’s approach, from what I can tell.

His approach really seems like something that should already be included in the framework and not something that we should have to figure out on our own. I’m sure tons of devs have run across quitting their apps while some http background threads are still running with no means of killing those threads until the connections are closed by the OS.

Also, we have no access to the background thread that runs when you call URL::createInputStream. My guess is that the juce dev who implemented it figured you would be using it for short little messages between your app and a server. connections that open and close extremely quickly. But that’s just my guess…

Alright, thanks to @lalala and @daniel I got this working. maybe you folks have a suggestion for a way around this manual deletion I have to do in order to use ChangeBroadcaster when the ThreadPoolJob finishes.

struct KillableWebInputStream : juce::ThreadPoolJob, juce::ChangeBroadcaster
{
    KillableWebInputStream(juce::URL url);
    
    juce::ThreadPoolJob::JobStatus runJob() override;
    
    static bool openStreamProgressCallback (void* context, int /*bytesSent*/, int /*totalBytes*/);
    
    juce::String getResult() const;
    
    static const juce::String FailedToConnectString;
private:
    juce::String result;
    juce::URL url;
    JUCE_DECLARE_WEAK_REFERENCEABLE(KillableWebInputStream)
};
KillableWebInputStream::KillableWebInputStream(juce::URL url_) :
ThreadPoolJob("KillableWebInputStream"),
url(url_)
{
    
}

KillableWebInputStream::~KillableWebInputStream()
juce::ThreadPoolJob::JobStatus KillableWebInputStream::runJob()
{
    const std::unique_ptr<InputStream> in (url.createInputStream (true, openStreamProgressCallback, this, String(), ServerConnectorThread::ServerTimeout));
    
    if (shouldExit() || in == nullptr)
    {
        result = KillableWebInputStream::FailedToConnectString;
    }
    else
    {
        result = in->readEntireStreamAsString();
    }
    
    //because of the odd instances where the Message thread is serviced 
    //before this ThreadPoolJob has it's 'isActive' flag set to false, 
    //sendChangeMessage() must be called asynchronously so there is 
    //time for the 'isActive' flag to be changed.
    MessageManager::callAsync([weakRef = juce::WeakReference<KillableWebInputStream>(this)]() mutable
    {
        if( weakRef.get() )
            weakRef->sendChangeMessage();
    });
    
    return juce::ThreadPoolJob::JobStatus::jobHasFinished;
}
bool KillableWebInputStream::openStreamProgressCallback (void* context, int /*bytesSent*/, int /*totalBytes*/)
{
    auto* threadPoolJob = static_cast<ThreadPoolJob*> (context);
    return ! threadPoolJob->shouldExit();
}

juce::String KillableWebInputStream::getResult() const {    return result; }

const juce::String KillableWebInputStream::FailedToConnectString { "Failed To Connect" };

Usage:

struct SomeClass : ChangeListener
{
    SomeClass() = default;
    ~SomeClass() override;
    void launch();
    void changeListenerCallback(ChangeBroadcaster* source) override;
private:
    ThreadPool threadPool { 1 };
    JUCE_DECLARE_WEAK_REFERENCEABLE(SomeClass)
};
void SomeClass::launch() //maybe this is called on a background thread or something..
{
    auto url = juce::URL("https://www.google.com");
    MessageManager::callAsync([url, weakRef = juce::WeakReference<SomeClass>(this)]() mutable
    {
        if(! weakRef.get() )
            return;
        
        auto kwis = std::make_unique<KillableWebInputStream>(url);
        kwis->addChangeListener(weakRef.get());
        //the job is owned by the pool but won't be deleted when it is completed.
        //reason: need to be able to call kwis->getResult() after the threadPoolJob finishes.
        weakRef->threadPool.addJob(kwis.release(), false); 
    });
}
void SomeClass::changeListenerCallback(juce::ChangeBroadcaster* source)
{
    if( auto* kwis = dynamic_cast<KillableWebInputStream*>(source))
    {
        auto str = kwis->getResult();
        
        auto removed = threadPool.removeJob(kwis, true, 100);
        jassert( removed);
        if( removed )
        {
            std::unique_ptr<KillableWebInputStream> kill(kwis); //kill it via Scoping
        }
        //now do whatever else you need to do now that you have the server reply
    }
}

if for whatever reason the job is not removed from the threadPool in under 100ms, it is still in the Job pool. And, because it was added with deleteJobWhenFinished = false, it won’t be deleted when the ThreadPool is deleted. So, I have to manually delete them like this:

SomeClass::~SomeClass()
{
    juce::OwnedArray<ThreadPoolJob> deleteList;
    for( int i = 0; i < threadPool.getNumJobs(); ++i )
    {
        deleteList.add(threadPool.getJob(i));
    }

    //wait until they are all deleted, which shouldn't take any time, but it might.
    while( true )
    {
        if( threadPool.removeAllJobs(true, 100) )
            break;
    }
}

Nevermind, figured out how to do it without needing changeListener entirely:

struct KillableWebInputStream : juce::ThreadPoolJob
{
    using Func = std::function<void(juce::String)>;
    
    KillableWebInputStream(juce::URL url, Func f);
    //snip....
private:
    Func func;
juce::ThreadPoolJob::JobStatus KillableWebInputStream::runJob()
{
    const std::unique_ptr<InputStream> in (url.createInputStream (true, openStreamProgressCallback, this, String(), ServerConnectorThread::ServerTimeout));
    
    if (shouldExit() || in == nullptr)
    {
        result = KillableWebInputStream::FailedToConnectString;
    }
    else
    {
        result = in->readEntireStreamAsString();
    }
    
    if( func )
        func(result); //pass the result to a lambda, if the user supplies one.
    
    return juce::ThreadPoolJob::JobStatus::jobHasFinished;
}

That eliminated any need for MessageManager::callAsync() as well as ChangeBroadcaster/Listener
w00t w00t!!

1 Like

Good job!

I wonder if it is possible / is any better to break down in->readEntireStreamAsString(); in smaller chunks with a timeout, in case you download a few MB more. That way shouldExit() would get a chance from time to time and it would avoid the forcably killing…

Something like

MemoryBlock block 

while (!shouldExit() && !in->isExhausted())
{
    in->readIntoMemoryBlock (block, 4096);
}

return block.toString();
1 Like

I really feel like this whole approach should be built in to the framework, and using this static function and derived class should not be needed to make a killable url stream. perhaps someone from the juce team can chime in with their thoughts? @ed95 @reuk

I think I’m failing to grasp what you are trying to do here. Presumably you are calling URL::createInputStream() on a background thread, otherwise you’ll be blocking the main thread, so you just give it a sensible timeout parameter (5000ms for example) and then in your operations where you read from the stream you regularly check threadShouldExit(). Then on app shutdown you call stopThread() for the background thread with a slightly higher timeout value to be safe and it’ll be cleaned up correctly, with the worst case being a slight delay when the user quits the app to wait for your background thread to exit. We do a similar thing in a few places in the Projucer that you can look at for examples.

Also, you don’t need to tag us - we read all the posts.

1 Like