28 #define THREAD_BUCKETS 89 407 struct ast_str *control_tps_name;
411 if (!
pool || !control_tps_name) {
416 ast_str_set(&control_tps_name, 0,
"%s/pool-control", name);
420 if (!
pool->control_tps) {
425 if (!
pool->active_threads) {
430 if (!
pool->idle_threads) {
435 if (!
pool->zombie_threads) {
506 ast_debug(1,
"Failed to activate thread %d. It is dead\n",
535 ast_debug(3,
"Increasing threadpool %s's size by %d\n",
538 for (i = 0; i < delta; ++i) {
569 unsigned int existing_active;
716 int *num_to_kill = arg;
718 if (*num_to_kill > 0) {
749 int *num_to_zombify = data;
751 if ((*num_to_zombify)-- > 0) {
753 ast_log(
LOG_WARNING,
"Failed to zombify active thread %d. Thread will remain active\n", worker->
id);
783 int idle_threads_to_kill =
MIN(delta, idle_threads);
784 int active_threads_to_zombify = delta - idle_threads_to_kill;
786 ast_debug(3,
"Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
792 ast_debug(3,
"Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
842 unsigned int num_threads = ssd->
size;
850 if (current_size == num_threads) {
851 ast_debug(3,
"Not changing threadpool size since new size %u is the same as current %u\n",
852 num_threads, current_size);
856 if (current_size < num_threads) {
863 grow(pool, num_threads - current_size);
867 shrink(pool, current_size - num_threads);
939 fullname =
ast_alloca(strlen(name) + strlen(
"/pool") + 1);
940 sprintf(fullname,
"%s/pool", name);
1028 ast_debug(3,
"Destroying worker thread %d\n", worker->
id);
1063 saved_state = worker->
state;
1074 if (saved_state ==
ZOMBIE) {
1156 struct timespec end = {
1158 .tv_nsec = start.tv_usec * 1000,
1169 ast_debug(1,
"Worker thread idle timeout reached. Dying.\n");
1234 if (!shutdown_group) {
1246 if (!shutdown_group) {
1255 struct timeval start;
1256 struct timespec end;
1259 end.tv_sec = start.tv_sec +
timeout;
1260 end.tv_nsec = start.tv_usec * 1000;
1261 while (shutdown_group->
count) {
1268 while (shutdown_group->
count) {
1275 remaining = shutdown_group->
count;
1292 ++shutdown_group->
count;
1308 --shutdown_group->
count;
1309 if (!shutdown_group->
count) {
1424 }
else if (shutdown_group) {
A listener for taskprocessors.
enum sip_cc_notify_state state
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
int auto_increment
Number of threads to increment pool by.
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
struct ast_serializer_shutdown_group * shutdown_group
worker_state
states for worker threads
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
Taskprocessor listener shutdown callback.
struct ast_threadpool_options options
static void worker_thread_destroy(void *obj)
Worker thread destructor.
Asterisk main include file. File version handling, generic pbx functions.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
int idle_timeout
Time limit in seconds for idle threads.
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
int max_size
Maximum number of threads a pool may have.
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
Struct used for queued operations involving worker state changes.
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
#define AST_THREADPOOL_OPTIONS_VERSION
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define ao2_callback(c, flags, cb_fn, arg)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
struct ast_taskprocessor * ast_threadpool_serializer_get_current(void)
Get the threadpool serializer currently associated with this thread.
struct ast_threadpool_options options
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
static pj_pool_t * pool
Global memory pool for configuration and timers.
return a reference to a taskprocessor, create one if it does not exist
Assume that the ao2_container is already locked.
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static void threadpool_destructor(void *obj)
Destroy a threadpool's components.
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ast_mutex_lock(a)
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
#define ast_cond_signal(cond)
const struct ast_threadpool_listener_callbacks * callbacks
Helper struct used for queued operations that change the size of the threadpool.
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
long ast_threadpool_queue_size(struct ast_threadpool *pool)
Return the size of the threadpool's task queue.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
static int task(void *data)
Queued task for baseline test.
pthread_cond_t ast_cond_t
AST_THREADSTORAGE_RAW(current_serializer)
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
static void * worker_start(void *arg)
start point for worker threads
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
#define ast_debug(level,...)
Log a DEBUG message.
struct ast_threadpool * pool
static void serializer_dtor(void *obj)
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool's main taskprocessor.
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
static int worker_thread_hash(const void *obj, int flags)
struct ast_threadpool * pool
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define AST_PTHREADT_NULL
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
#define ao2_ref(o, delta)
struct ast_threadpool * pool
static int serializer_start(struct ast_taskprocessor_listener *listener)
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
void(* thread_start)(void)
Function to call when a thread starts.
#define ast_malloc(len)
A wrapper for malloc()
struct worker_thread * worker
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data)
Push a task to the threadpool.
void(* thread_end)(void)
Function to call when a thread ends.
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
static int queued_idle_thread_dead(void *data)
struct ast_threadpool_listener * listener
struct ast_threadpool * pool
struct ast_threadpool * pool
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
#define ao2_unlink(container, obj)
#define ao2_callback_data(container, flags, cb_fn, arg, data)
struct ast_taskprocessor * tps
The main taskprocessor.
#define ast_cond_destroy(cond)
#define ao2_alloc(data_size, destructor_fn)
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool's taskprocessor is emptied.
static int worker_thread_start(struct worker_thread *worker)
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define ast_pthread_create(a, b, c, d)
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
An API for managing task processing threads that can be shared across modules.
static int worker_id_counter
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
static void serializer_shutdown_group_dtor(void *vdoomed)
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker's state.
listener for a threadpool
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
A ast_taskprocessor structure is a singleton by name.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
Taskprocessor listener emptied callback.
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
static int worker_thread_cmp(void *obj, void *arg, int flags)
An opaque threadpool structure.
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static int execute_tasks(void *data)
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
helper used for queued task when tasks are pushed
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
#define ast_mutex_init(pmutex)
struct ast_threadpool * pool
#define ast_mutex_destroy(a)
static int queued_set_size(void *data)
Change the size of the threadpool.
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
#define ast_cond_timedwait(cond, mutex, time)
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Taskprocessor listener callback called when a task is added.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Structure for mutex and tracking information.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
#define ast_mutex_unlock(a)
struct ast_taskprocessor * control_tps
The control taskprocessor.
#define ao2_link(container, obj)