Alright, thanks to @lalala and @daniel I got this working. maybe you folks have a suggestion for a way around this manual deletion I have to do in order to use ChangeBroadcaster when the ThreadPoolJob finishes.
struct KillableWebInputStream : juce::ThreadPoolJob, juce::ChangeBroadcaster
{
KillableWebInputStream(juce::URL url);
juce::ThreadPoolJob::JobStatus runJob() override;
static bool openStreamProgressCallback (void* context, int /*bytesSent*/, int /*totalBytes*/);
juce::String getResult() const;
static const juce::String FailedToConnectString;
private:
juce::String result;
juce::URL url;
JUCE_DECLARE_WEAK_REFERENCEABLE(KillableWebInputStream)
};
KillableWebInputStream::KillableWebInputStream(juce::URL url_) :
ThreadPoolJob("KillableWebInputStream"),
url(url_)
{
}
KillableWebInputStream::~KillableWebInputStream()
juce::ThreadPoolJob::JobStatus KillableWebInputStream::runJob()
{
const std::unique_ptr<InputStream> in (url.createInputStream (true, openStreamProgressCallback, this, String(), ServerConnectorThread::ServerTimeout));
if (shouldExit() || in == nullptr)
{
result = KillableWebInputStream::FailedToConnectString;
}
else
{
result = in->readEntireStreamAsString();
}
//because of the odd instances where the Message thread is serviced
//before this ThreadPoolJob has it's 'isActive' flag set to false,
//sendChangeMessage() must be called asynchronously so there is
//time for the 'isActive' flag to be changed.
MessageManager::callAsync([weakRef = juce::WeakReference<KillableWebInputStream>(this)]() mutable
{
if( weakRef.get() )
weakRef->sendChangeMessage();
});
return juce::ThreadPoolJob::JobStatus::jobHasFinished;
}
bool KillableWebInputStream::openStreamProgressCallback (void* context, int /*bytesSent*/, int /*totalBytes*/)
{
auto* threadPoolJob = static_cast<ThreadPoolJob*> (context);
return ! threadPoolJob->shouldExit();
}
juce::String KillableWebInputStream::getResult() const { return result; }
const juce::String KillableWebInputStream::FailedToConnectString { "Failed To Connect" };
Usage:
struct SomeClass : ChangeListener
{
SomeClass() = default;
~SomeClass() override;
void launch();
void changeListenerCallback(ChangeBroadcaster* source) override;
private:
ThreadPool threadPool { 1 };
JUCE_DECLARE_WEAK_REFERENCEABLE(SomeClass)
};
void SomeClass::launch() //maybe this is called on a background thread or something..
{
auto url = juce::URL("https://www.google.com");
MessageManager::callAsync([url, weakRef = juce::WeakReference<SomeClass>(this)]() mutable
{
if(! weakRef.get() )
return;
auto kwis = std::make_unique<KillableWebInputStream>(url);
kwis->addChangeListener(weakRef.get());
//the job is owned by the pool but won't be deleted when it is completed.
//reason: need to be able to call kwis->getResult() after the threadPoolJob finishes.
weakRef->threadPool.addJob(kwis.release(), false);
});
}
void SomeClass::changeListenerCallback(juce::ChangeBroadcaster* source)
{
if( auto* kwis = dynamic_cast<KillableWebInputStream*>(source))
{
auto str = kwis->getResult();
auto removed = threadPool.removeJob(kwis, true, 100);
jassert( removed);
if( removed )
{
std::unique_ptr<KillableWebInputStream> kill(kwis); //kill it via Scoping
}
//now do whatever else you need to do now that you have the server reply
}
}
if for whatever reason the job is not removed from the threadPool in under 100ms, it is still in the Job pool. And, because it was added with deleteJobWhenFinished = false
, it won’t be deleted when the ThreadPool is deleted. So, I have to manually delete them like this:
SomeClass::~SomeClass()
{
juce::OwnedArray<ThreadPoolJob> deleteList;
for( int i = 0; i < threadPool.getNumJobs(); ++i )
{
deleteList.add(threadPool.getJob(i));
}
//wait until they are all deleted, which shouldn't take any time, but it might.
while( true )
{
if( threadPool.removeAllJobs(true, 100) )
break;
}
}