88 static int task(
void *data)
110 end.tv_sec = start.tv_sec + 30;
111 end.tv_nsec = start.tv_usec * 1000;
117 if (res == ETIMEDOUT) {
140 info->name =
"default_taskprocessor";
141 info->category =
"/main/taskprocessor/";
142 info->summary =
"Test of default taskproccesor";
144 "Ensures that a queued task gets executed.";
183 #define TEST_DATA_ARRAY_SIZE 10 184 #define LOW_WATER_MARK 3 185 #define HIGH_WATER_MARK 6 190 unsigned int alert_level;
191 unsigned int subsystem_alert_level;
195 info->name =
"subsystem_alert";
196 info->category =
"/main/taskprocessor/";
197 info->summary =
"Test of subsystem alerts";
199 "Ensures alerts are generated properly.";
236 if (subsystem_alert_level) {
243 if (subsystem_alert_level > 0) {
247 if (alert_level > 0) {
252 if (subsystem_alert_level == 0) {
256 if (alert_level == 0) {
278 if (!subsystem_alert_level) {
285 if (subsystem_alert_level == 0) {
289 if (alert_level == 0) {
294 if (subsystem_alert_level > 0) {
298 if (alert_level > 0) {
314 #define NUM_TASKS 20000 338 int *randdata = data;
355 struct timeval start;
364 info->name =
"default_taskprocessor_load";
365 info->category =
"/main/taskprocessor/";
366 info->summary =
"Load test of default taskproccesor";
368 "Ensure that a large number of queued tasks are executed in the proper order.";
383 ts.tv_sec = start.tv_sec + 60;
384 ts.tv_nsec = start.tv_usec * 1000;
402 if (timedwait_res == ETIMEDOUT) {
564 info->name =
"taskprocessor_listener";
565 info->category =
"/main/taskprocessor/";
566 info->summary =
"Test of taskproccesor listeners";
568 "Ensures that listener callbacks are called when expected.";
696 struct timespec end = {
697 .tv_sec = start.tv_sec + 5,
698 .tv_nsec = start.tv_usec * 1000
720 struct timespec end = {
721 .tv_sec = start.tv_sec + 5,
722 .tv_nsec = start.tv_usec * 1000
757 pthread_t shutdown_thread;
761 info->name =
"taskprocessor_shutdown";
762 info->category =
"/main/taskprocessor/";
763 info->summary =
"Test of taskproccesor shutdown sequence";
765 "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
775 if (!tps || !task1 || !task2) {
799 if (pthread_res != 0) {
814 pthread_join(shutdown_thread,
NULL);
847 info->name = __func__;
848 info->category =
"/main/taskprocessor/";
849 info->summary =
"Test of pushing local data";
851 "Ensures that local data is passed along.";
885 if (local_data != 1) {
887 "Queued task did not set local_data!\n");
917 info->name =
"serializer_pool";
918 info->category =
"/main/taskprocessor/";
919 info->summary =
"Test using a serializer pool";
921 "Ensures that a queued task gets executed.";
944 serializer_pool =
NULL;
961 serializer_pool =
NULL;
A listener for taskprocessors.
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Asterisk main include file. File version handling, generic pbx functions.
static void shutdown_poke(struct shutdown_data *shutdown_data)
int ast_test_unregister(ast_test_cb_t *cb)
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
static int shutdown_task_exec(void *data)
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's emptied callback.
#define AST_THREADPOOL_OPTIONS_VERSION
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
static const struct ast_taskprocessor_listener_callbacks test_callbacks
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's shutdown callback.
#define TEST_DATA_ARRAY_SIZE
return a reference to a taskprocessor, create one if it does not exist
#define ast_cond_wait(cond, mutex)
#define ast_cond_init(cond, attr)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
#define ast_mutex_lock(a)
static int local_task_exe(struct ast_taskprocessor_local *local)
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define ast_cond_signal(cond)
Relevant data associated with taskprocessor load test.
static void * tps_shutdown_thread(void *data)
static int task(void *data)
Queued task for baseline test.
pthread_cond_t ast_cond_t
int ast_test_register(ast_test_cb_t *cb)
static int load_module(void)
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
static void * listener(void *unused)
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define ast_test_status_update(a, b, c...)
#define ao2_ref(o, delta)
long int ast_random(void)
static void task_data_dtor(void *obj)
static struct shutdown_data * shutdown_data_create(int dont_wait)
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static void shutdown_data_dtor(void *data)
static struct ast_threadpool * threadpool
Thread pool for observers.
static struct task_data * task_data_create(void)
Create a task_data object.
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener's task_pushed callback
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by sybsystem.
def ignore(key=None, val=None, section=None, pjsip=None, nmapped=None, type='endpoint')
userdata associated with baseline taskprocessor test
#define ast_cond_destroy(cond)
#define ao2_alloc(data_size, destructor_fn)
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
#define ast_calloc(num, len)
A wrapper for calloc()
#define ast_pthread_create(a, b, c, d)
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static int unload_module(void)
An API for managing task processing threads that can be shared across modules.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
A ast_taskprocessor structure is a singleton by name.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
An opaque threadpool structure.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
#define ast_mutex_init(pmutex)
#define ast_mutex_destroy(a)
Private data for the test taskprocessor listener.
#define ASTERISK_GPL_KEY
The text the key() function should return.
static struct load_task_data load_task_results
Asterisk module definitions.
static void data_cleanup(void *data)
#define ast_cond_timedwait(cond, mutex, time)
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Structure for mutex and tracking information.
static int test_start(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's start callback
AST_TEST_DEFINE(default_taskprocessor)
Baseline test for default taskprocessor.
#define ast_mutex_unlock(a)