/********************************************************************************
* *
* T h r e a d P o o l *
* *
*********************************************************************************
* Copyright (C) 2006,2024 by Jeroen van der Zijp. All Rights Reserved. *
*********************************************************************************
* This library is free software; you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as published by *
* the Free Software Foundation; either version 3 of the License, or *
* (at your option) any later version. *
* *
* This library is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see *
********************************************************************************/
#ifndef FXTHREADPOOL_H
#define FXTHREADPOOL_H
#ifndef FXRUNNABLE_H
#include "FXRunnable.h"
#endif
namespace FX {
class FXWorker;
/// Task queue
typedef FXLFQueueOf FXTaskQueue;
/**
* A Thread Pool manages execution of tasks on a number of worker-threads.
*
* A task executed by the thread pool is queued up until a worker-thread becomes available
* to run it.
* To accomodate fluctuations in workloads and minimize resources, the number of worker-
* threads can be allowed to vary between a minimum and a maximum number.
* Idle worker-threads which receive no assignments within a specified amount of time will
* automatically terminate, thereby reduce system-resources used by the program.
* By default, the minimum number of worker-threads is 1, and the maximum number of worker-
* threads is equal to the number of processors in the system.
* During peak workloads, when the task queue may start to fill up, and no new worker-
* threads can be created, the calling thread to block until there is room in the queue
* for more tasks.
* However, if a non-default value is passed for the blocking-parameter to execute(), the
* calling thread will be blocked for only a finite amount of time (non-zero blocking value)
* or return immediately (zero blocking value).
* Failure to queue up a new task will result in execute() returning a false.
* The tasks which are passed to the thread pool are derived from FXRunnable. In order
* to perform some useful function, subclasses of FXRunnable should overload the run()
* function.
* Uncaught exceptions thrown by a task are intercepted by the thread pool and rethrown
* after the necessary cleanup, and cause the worker thread to end prematurely.
* When the thread pool is stopped, it will wait until all tasks are finished, and then
* cause all worker-threads to terminate.
* The thread pool becomes associated (through a thread-local variable) with the calling
* thread when start() is called; this association lasts until stop() is called.
* In addition, each worker will similarly be associated with the thread pool.
* Thus both the main thread as well as the worker threads can easily find the thread
* pool through the member-function instance().
*/
class FXAPI FXThreadPool : public FXRunnable {
private:
FXTaskQueue queue; // Task queue
FXCompletion tasks; // Active tasks
FXCompletion threads; // Active threads
FXSemaphore freeslots; // Free slots in queue
FXSemaphore usedslots; // Used slots in queue
FXuval stacksize; // Stack size
FXTime expiration; // Quit if no task within this time
volatile FXuint maximum; // Maximum threads
volatile FXuint minimum; // Minimum threads
volatile FXuint workers; // Working threads
volatile FXuint running; // Context is running
private:
static FXAutoThreadStorageKey reference;
private:
FXbool startWorker();
void runWhile(FXCompletion& comp,FXTime timeout);
virtual FXint run();
private:
FXThreadPool(const FXThreadPool&);
FXThreadPool &operator=(const FXThreadPool&);
public:
/**
* Construct an empty thread pool, with given task-queue size.
*/
FXThreadPool(FXuint sz=256);
/// Return true if running
FXbool active() const { return running==1; }
/// Change task queue size, return true if success
FXbool setSize(FXuint sz);
/// Return task queue size
FXuint getSize() const { return queue.getSize(); }
/// Return number of tasks
FXuint getRunningTasks() const { return tasks.count(); }
/// Return number of threads
FXuint getRunningThreads() const { return threads.count(); }
/// Change minimum number of worker threads; default is 1
FXbool setMinimumThreads(FXuint n);
/// Return minimum number of worker threads
FXuint getMinimumThreads() const { return minimum; }
/// Change maximum number of worker threads; default is #processors
FXbool setMaximumThreads(FXuint n);
/// Return maximum number of worker threads
FXuint getMaximumThreads() const { return maximum; }
/// Change expiration time for excess workers to terminate
FXbool setExpiration(FXTime ns=forever);
/// Get expiration time
FXTime getExpiration() const { return expiration; }
/// Change stack size (0 for default stack size)
FXbool setStackSize(FXuval sz);
/// Get stack size
FXuval getStackSize() const { return stacksize; }
/// Return calling thread's thread pool
static FXThreadPool* instance();
/// Change calling thread's thread pool
static void instance(FXThreadPool *pool);
/**
* Start the thread pool with an initial number of threads equal to count.
* Returns the number of threads actually started.
* An association will be established between the calling thread and the thread pool.
* This association lasts until stop() is called. If another threadpool was already started
* before by the calling thread, no new association will be established.
*/
FXuint start(FXuint count=0);
/**
* Execute a task on the thread pool by entering it into the queue.
* If a spot becomes available in the task queue within the timeout interval,
* add the task to the queue and return true.
* Return false if the task could not be added within the given time interval.
* Possibly starts additional worker threads if the maximum number of worker
* threads has not yet been exceeded.
*/
FXbool execute(FXRunnable* task,FXTime blocking=forever);
/**
* Execute task on the thread pool by entering int into the queue.
* If the task was successfully added, the calling thread will temporarily enter
* the task-processing loop, and help out the worker-threads until all tasks
* have finished processing.
* Return false if the task could not be added within the given time interval.
* Possibly starts additional worker threads if the maximum number of worker
* threads has not yet been exceeded.
*/
FXbool executeAndWait(FXRunnable* task,FXTime blocking=forever);
/**
* Execute task on the thread pool by entering int into the queue.
* If the task was successfully added, the calling thread will temporarily enter
* the task-processing loop, and help out the worker-threads until the completion
* becomes signaled.
* Return false if the task could not be added within the given time interval.
* Possibly starts additional worker threads if the maximum number of worker
* threads has not yet been exceeded.
*/
FXbool executeAndWaitFor(FXRunnable* task,FXCompletion& comp,FXTime blocking=forever);
/**
* Wait until task queue becomes empty and all tasks are finished, and process tasks
* to help the worker threads in the meantime.
* If the thread pool was not running, return immediately with false; otherwise,
* return when the queue is empty and all tasks have finished.
*/
FXbool wait();
/**
* Wait until completion becomes signaled, and process tasks to help the worker
* threads in the meantime.
* If the thread pool was not running, return immediately with false; otherwise,
* return when the completion becomes signaled, or when the thread pool is stopped.
* Immediately return with false if the thread pool wasn't running.
*/
FXbool waitFor(FXCompletion& comp);
/**
* Stop thread pool, and block posting of any new tasks to the queue.
* Enter the task-processing loop and help the worker-threads until the task queue is
* empty, and all tasks have finished executing.
* The association between the calling thread, established when start() was called,
* will hereby be dissolved, if the calling thread was associated with this thread pool.
* Return false if the thread pool wasn't running.
*/
FXbool stop();
/**
* Stop the thread pool and then delete it.
*/
virtual ~FXThreadPool();
};
}
#endif