Parallel processing of arrays/multithreading

Hey there again :slight_smile:

I am working on a plugin to manage your sample library. For this I have to deal with a lot of files / sample representation objects while loading/filtering samples of a library. This creates loops like the following two:

// Go through all files in directory and add sample items
// for all that are not yet loaded into the library
for (File const & sampleFile : allSampleFiles)
{
    if (!addedFilePaths.contains(sampleFile.getFullPathName()))
    {
        createSampleItem(sampleFile);
    }
    
    numProcessedItems++;
    setProgress(numProcessedItems / (double) numItemsToProcess);
}
for (SampleItem* sampleItem : mAllSampleItems)
{
    if (mFileFilter->matchesRules(*sampleItem))
    {
        mFilteredSampleItems.add(sampleItem);
    }
}

My thought was that to make the program quicker and more usable I could process these tasks on the arrays in parallel. So, instead of going through the Array<File> allSampleFiles linearly, I could somehow acquire a sensible amount of threads, sifting though different portions of the array in parallel. Is there some way to just tell the program to process the current loop in parallel by automatically choosing the amount of threads that are available performance wise and then just doing its thing? I’m super new to multi-threading/parallel processing so if you could point me in the right direction that would be great!

Thanks!

JUCE has a built in ThreadPool class that might fit the bill, or you can roll your own similar job manager and create threads directly.
In general it makes sense to limit the number of threads to the system number of reported CPU cores using SystemStats::getNumCpus(); to get the reported number available.

1 Like

I made something similar with an Image batch processor. Basically, I have a Thread class that performs a specific task on a series of files, then I launch a number of threads each doing its own processing. An array of threads rather than a thread pool.

Here’s some code:

class myProcessorThread : public juce::Thread
{
public:
    myProcessorThread(const String& threadName, const juce::Array<File>& _fileList, const File& _outputFolder) 
        : Thread(threadName), fileList(_fileList), outputFolder(_outputFolder)
    {
        startThread();
    }
    ~ImgProcessorThread()
    {
        if (threadShouldExit())
            stopThread(100);
    }

    void run() override
    {
        StopProcessing.store(false);

        // For each file
        fileCount = 0;
        for (auto& f : fileList)
        {
            if (StopProcessing)
                return;

	// Do all the processing on the file...
	
	
	// Count files and increment a progressBar
            ++fileCount;
            auto progress = 1.0 / (double)fileList.size() * (double)fileCount;
            
            if (onProgress != nullptr)
                onProgress(progress, f.getFileName() + " " + String(fileCount) + "/" + String(fileList.size()));
        }

        if (!Result())
            if (onProgress != nullptr)
                onProgress(1.0, "Done.");
    }

    void Abort()
    {
        StopProcessing.store(true);
    }

    bool Result()
    {
        return fileCount == fileList.size();
    }

    std::function<void(double progress, const String& text)> onProgress = nullptr;

private:
    juce::Array<File> fileList;
    File outputFolder;
    int fileCount = 0;
    std::atomic<bool> StopProcessing;

    JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(myProcessorThread)
};

Then I have a class member that holds all pointers to thread objects:
OwnedArray<myProcessorThread> myProcessorThreads;

Then when I start the process…


void MainComponent::StartProcess()
{
    myProcessorThreads.clear();

    // for each group of files...
    for (auto s : Selection)
    {

        // Create and start processor threads
        auto* Processor = new myProcessorThread(someThreadName, InputFiles, destFolder); // Pass specific parameters to the thread class
        myProcessorThreads.add(Processor);

        Processor->onProgress = [&, p](double progress, const String& text)
        {
		// Update a progress bar when following the thread progress
        };
    }
}
1 Like