154 struct timespec end = {
155 .tv_sec = start.tv_sec + 5,
156 .tv_nsec = start.tv_usec * 1000
181 struct timespec end = {
182 .tv_sec = start.tv_sec + 5,
183 .tv_nsec = start.tv_usec * 1000
197 struct timespec end = {
198 .tv_sec = start.tv_sec + 5,
199 .tv_nsec = start.tv_usec * 1000
220 struct timespec end = {
221 .tv_sec = start.tv_sec + 5,
222 .tv_nsec = start.tv_usec * 1000
256 task_pushed ?
"" :
"not ", tld->
task_pushed ?
"" :
" not");
261 was_empty ?
"" :
"not ", tld->
was_empty ?
"" :
" not");
281 was_empty ?
"an" :
"no", tld->
task_pushed ?
"one" :
"none");
306 info->category =
"/main/threadpool/";
307 info->summary =
"Test task";
309 "Basic threadpool test";
366 info->name =
"initial_threads";
367 info->category =
"/main/threadpool/";
368 info->summary =
"Test threadpool initialization state";
370 "Ensure that a threadpool created with a specific size contains the\n" 371 "proper number of idle threads.";
418 info->name =
"thread_creation";
419 info->category =
"/main/threadpool/";
420 info->summary =
"Test threadpool thread creation";
422 "Ensure that threads can be added to a threadpool";
473 info->name =
"thread_destruction";
474 info->category =
"/main/threadpool/";
475 info->summary =
"Test threadpool thread destruction";
477 "Ensure that threads are properly destroyed in a threadpool";
537 info->name =
"thread_timeout";
538 info->category =
"/main/threadpool/";
539 info->summary =
"Test threadpool thread timeout";
541 "Ensure that a thread with a two second timeout dies as expected.";
605 info->name =
"thread_timeout_thrash";
606 info->category =
"/main/threadpool/";
607 info->summary =
"Thrash threadpool thread timeout";
609 "Repeatedly queue a task when a threadpool thread should timeout.";
632 for (iteration = 0; iteration < 30; ++iteration) {
635 struct timespec end = {
637 .tv_nsec = start.tv_usec * 1000
696 info->name =
"one_task_one_thread";
697 info->category =
"/main/threadpool/";
698 info->summary =
"Test a single task with a single thread";
700 "Push a task into an empty threadpool, then add a thread to the pool.";
780 info->name =
"one_thread_one_task";
781 info->category =
"/main/threadpool/";
782 info->summary =
"Test a single thread with a single task";
784 "Add a thread to the pool and then push a task to it.";
867 info->name =
"one_thread_multiple_tasks";
868 info->category =
"/main/threadpool/";
869 info->summary =
"Test a single thread with multiple tasks";
871 "Add a thread to the pool and then push three tasks to it.";
895 if (!std1 || !std2 || !std3) {
958 struct timeval start;
967 end.tv_sec = start.tv_sec + 5;
968 end.tv_nsec = start.tv_usec * 1000;
1002 .auto_increment = 3,
1009 info->name =
"auto_increment";
1010 info->category =
"/main/threadpool/";
1011 info->summary =
"Test that the threadpool grows as tasks are added";
1013 "Create an empty threadpool and push a task to it. Once the task is\n" 1014 "pushed, the threadpool should add three threads and be able to\n" 1015 "handle the task. The threads should then go idle";
1040 if (!std1 || !std2 || !std3 || !std4) {
1127 .auto_increment = 3,
1134 info->name =
"max_size";
1135 info->category =
"/main/threadpool/";
1136 info->summary =
"Test that the threadpool does not exceed its maximum size restriction";
1138 "Create an empty threadpool and push a task to it. Once the task is\n" 1139 "pushed, the threadpool should attempt to grow by three threads, but the\n" 1140 "pool's restrictions should only allow two threads to be added.";
1200 .auto_increment = 0,
1207 info->name =
"reactivation";
1208 info->category =
"/main/threadpool/";
1209 info->summary =
"Test that a threadpool reactivates when work is added";
1211 "Push a task into a threadpool. Make sure the task executes and the\n" 1212 "thread goes idle. Then push a second task and ensure that the thread\n" 1213 "awakens and executes the second task.";
1236 if (!std1 || !std2) {
1360 struct timespec end = {
1361 .tv_sec = start.tv_sec + 5,
1362 .tv_nsec = start.tv_usec * 1000
1378 struct timespec end = {
1379 .tv_sec = start.tv_sec + 1,
1380 .tv_nsec = start.tv_usec * 1000
1396 struct timespec end = {
1397 .tv_sec = start.tv_sec + 5,
1398 .tv_nsec = start.tv_usec * 1000
1426 .auto_increment = 0,
1433 info->name =
"task_distribution";
1434 info->category =
"/main/threadpool/";
1435 info->summary =
"Test that tasks are evenly distributed to threads";
1437 "Push two tasks into a threadpool. Ensure that each is handled by\n" 1438 "a separate thread";
1461 if (!ctd1 || !ctd2) {
1525 .auto_increment = 0,
1532 info->name =
"more_destruction";
1533 info->category =
"/main/threadpool/";
1534 info->summary =
"Test that threads are destroyed as expected";
1536 "Push two tasks into a threadpool. Set the threadpool size to 4\n" 1537 "Ensure that there are 2 active and 2 idle threads. Then shrink the\n" 1538 "threadpool down to 1 thread. Ensure that the thread leftover is active\n" 1539 "and ensure that both tasks complete.";
1562 if (!ctd1 || !ctd2) {
1643 .auto_increment = 0,
1650 info->name =
"threadpool_serializer";
1651 info->category =
"/main/threadpool/";
1652 info->summary =
"Test that serializers";
1654 "Ensures that tasks enqueued to a serialize execute in sequence.";
1669 if (!uut || !data1 || !data2 || !data3) {
1756 .auto_increment = 0,
1763 info->name =
"threadpool_serializer_dupe";
1764 info->category =
"/main/threadpool/";
1765 info->summary =
"Test that serializers are uniquely named";
1767 "Creating two serializers with the same name should\n" 1787 if (there_can_be_only_one) {
void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size)
Set the number of threads for the thread pool.
#define AST_MODULE_INFO_STANDARD(keystr, desc)
static void complex_task_data_free(struct complex_task_data *ctd)
Asterisk locking-related definitions:
Asterisk main include file. File version handling, generic pbx functions.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
int idle_timeout
Time limit in seconds for idle threads.
static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
static int unload_module(void)
static enum ast_test_result_state listener_check(struct ast_test *test, struct ast_threadpool_listener *listener, int task_pushed, int was_empty, int num_tasks, int num_active, int num_idle, int empty_notice)
int ast_test_unregister(ast_test_cb_t *cb)
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
#define AST_THREADPOOL_OPTIONS_VERSION
static void simple_task_data_free(struct simple_task_data *std)
static int simple_task(void *data)
static pj_pool_t * pool
Global memory pool for configuration and timers.
static struct test_listener_data * test_alloc(void)
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define ast_mutex_lock(a)
static int has_complex_started(struct complex_task_data *ctd)
#define ast_cond_signal(cond)
pthread_cond_t ast_cond_t
static int load_module(void)
int ast_test_register(ast_test_cb_t *cb)
static void * listener(void *unused)
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define ast_test_status_update(a, b, c...)
static void test_shutdown(struct ast_threadpool_listener *listener)
static void poke_worker(struct complex_task_data *ctd)
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener's user data.
static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
static void test_state_changed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
static int wait_for_complex_start(struct complex_task_data *ctd)
static struct simple_task_data * simple_task_data_alloc(void)
AST_TEST_DEFINE(threadpool_push)
static const struct ast_threadpool_listener_callbacks test_callbacks
#define ast_cond_destroy(cond)
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
static enum ast_test_result_state wait_until_thread_state_task_pushed(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
#define ast_calloc(num, len)
A wrapper for calloc()
static void test_emptied(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data) attribute_warn_unused_result
Push a task to the threadpool.
An API for managing task processing threads that can be shared across modules.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Support for logging to various files, console and syslog Configuration in file logger.conf.
listener for a threadpool
A ast_taskprocessor structure is a singleton by name.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
An opaque threadpool structure.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
#define ast_mutex_init(pmutex)
static void test_task_pushed(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
#define ast_mutex_destroy(a)
static int complex_task(void *data)
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
static struct complex_task_data * complex_task_data_alloc(void)
#define ast_cond_timedwait(cond, mutex, time)
Structure for mutex and tracking information.
#define ast_mutex_unlock(a)