Asterisk - The Open Source Telephony Project  18.5.0
Data Structures | Macros | Functions | Variables
test_taskprocessor.c File Reference

taskprocessor unit tests More...

#include "asterisk.h"
#include "asterisk/test.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/module.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"
Include dependency graph for test_taskprocessor.c:

Go to the source code of this file.

Data Structures

struct  load_task_data
 Relevant data associated with taskprocessor load test. More...
 
struct  shutdown_data
 
struct  task_data
 userdata associated with baseline taskprocessor test More...
 
struct  test_listener_pvt
 Private data for the test taskprocessor listener. More...
 

Macros

#define HIGH_WATER_MARK   6
 
#define LOW_WATER_MARK   3
 
#define NUM_TASKS   20000
 
#define TEST_DATA_ARRAY_SIZE   10
 

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
 AST_TEST_DEFINE (default_taskprocessor)
 Baseline test for default taskprocessor. More...
 
 AST_TEST_DEFINE (subsystem_alert)
 Baseline test for subsystem alert. More...
 
 AST_TEST_DEFINE (default_taskprocessor_load)
 Load test for taskprocessor with default listener. More...
 
 AST_TEST_DEFINE (taskprocessor_listener)
 Test for a taskprocessor with custom listener. More...
 
 AST_TEST_DEFINE (taskprocessor_shutdown)
 
 AST_TEST_DEFINE (taskprocessor_push_local)
 
 AST_TEST_DEFINE (serializer_pool)
 Baseline test for a serializer pool. More...
 
static int check_stats (struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
 helper to ensure that statistics the listener is keeping are what we expect More...
 
static int listener_test_task (void *ignore)
 Queued task for taskprocessor listener test. More...
 
static int load_module (void)
 
static int load_task (void *data)
 a queued task to be used in the taskprocessor load test More...
 
static int local_task_exe (struct ast_taskprocessor_local *local)
 
static struct shutdown_datashutdown_data_create (int dont_wait)
 
static void shutdown_data_dtor (void *data)
 
static int shutdown_has_completed (struct shutdown_data *shutdown_data)
 
static void shutdown_poke (struct shutdown_data *shutdown_data)
 
static int shutdown_task_exec (void *data)
 
static int shutdown_waitfor_completion (struct shutdown_data *shutdown_data)
 
static int shutdown_waitfor_start (struct shutdown_data *shutdown_data)
 
static int task (void *data)
 Queued task for baseline test. More...
 
static struct task_datatask_data_create (void)
 Create a task_data object. More...
 
static void task_data_dtor (void *obj)
 
static int task_wait (struct task_data *task_data)
 Wait for a task to execute. More...
 
static void test_emptied (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's emptied callback. More...
 
static void * test_listener_pvt_alloc (void)
 test taskprocessor listener's alloc callback More...
 
static void test_shutdown (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's shutdown callback. More...
 
static int test_start (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's start callback More...
 
static void test_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 test taskprocessor listener's task_pushed callback More...
 
static void * tps_shutdown_thread (void *data)
 
static int unload_module (void)
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "taskprocessor test module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, .support_level = AST_MODULE_SUPPORT_CORE, }
 
static const struct ast_module_infoast_module_info = &__mod_info
 
static struct load_task_data load_task_results
 
static const struct ast_taskprocessor_listener_callbacks test_callbacks
 

Detailed Description

taskprocessor unit tests

Author
Mark Michelson mmich.nosp@m.elso.nosp@m.n@dig.nosp@m.ium..nosp@m.com

Definition in file test_taskprocessor.c.

Macro Definition Documentation

◆ HIGH_WATER_MARK

#define HIGH_WATER_MARK   6

Referenced by AST_TEST_DEFINE().

◆ LOW_WATER_MARK

#define LOW_WATER_MARK   3

Referenced by AST_TEST_DEFINE().

◆ NUM_TASKS

#define NUM_TASKS   20000

Definition at line 314 of file test_taskprocessor.c.

Referenced by AST_TEST_DEFINE().

◆ TEST_DATA_ARRAY_SIZE

#define TEST_DATA_ARRAY_SIZE   10

Referenced by AST_TEST_DEFINE().

Function Documentation

◆ __reg_module()

static void __reg_module ( void  )
static

Definition at line 990 of file test_taskprocessor.c.

◆ __unreg_module()

static void __unreg_module ( void  )
static

Definition at line 990 of file test_taskprocessor.c.

◆ AST_MODULE_SELF_SYM()

struct ast_module* AST_MODULE_SELF_SYM ( void  )

Definition at line 990 of file test_taskprocessor.c.

◆ AST_TEST_DEFINE() [1/7]

AST_TEST_DEFINE ( default_taskprocessor  )

Baseline test for default taskprocessor.

This test ensures that when a task is added to a taskprocessor that has been allocated with a default listener that the task gets executed as expected

Definition at line 132 of file test_taskprocessor.c.

References ao2_cleanup, ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, sip_to_pjsip::info(), NULL, RAII_VAR, task(), task_data_create(), task_wait(), TEST_EXECUTE, TEST_INIT, and TPS_REF_DEFAULT.

133 {
136  int res;
137 
138  switch (cmd) {
139  case TEST_INIT:
140  info->name = "default_taskprocessor";
141  info->category = "/main/taskprocessor/";
142  info->summary = "Test of default taskproccesor";
143  info->description =
144  "Ensures that a queued task gets executed.";
145  return AST_TEST_NOT_RUN;
146  case TEST_EXECUTE:
147  break;
148  }
149 
150  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
151 
152  if (!tps) {
153  ast_test_status_update(test, "Unable to create test taskprocessor\n");
154  return AST_TEST_FAIL;
155  }
156 
158  if (!task_data) {
159  ast_test_status_update(test, "Unable to create task_data\n");
160  return AST_TEST_FAIL;
161  }
162 
164  ast_test_status_update(test, "Failed to queue task\n");
165  return AST_TEST_FAIL;
166  }
167 
168  res = task_wait(task_data);
169  if (res != 0) {
170  ast_test_status_update(test, "Queued task did not execute!\n");
171  return AST_TEST_FAIL;
172  }
173 
174  return AST_TEST_PASS;
175 }
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
#define NULL
Definition: resample.c:96
static int task(void *data)
Queued task for baseline test.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
static struct task_data * task_data_create(void)
Create a task_data object.
def info(msg)
userdata associated with baseline taskprocessor test
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

◆ AST_TEST_DEFINE() [2/7]

AST_TEST_DEFINE ( subsystem_alert  )

Baseline test for subsystem alert.

Definition at line 180 of file test_taskprocessor.c.

References ao2_cleanup, ast_taskprocessor_alert_get(), ast_taskprocessor_alert_set_levels(), ast_taskprocessor_get(), ast_taskprocessor_get_subsystem_alert(), ast_taskprocessor_push(), ast_taskprocessor_size(), ast_taskprocessor_suspend(), ast_taskprocessor_unreference(), ast_taskprocessor_unsuspend(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, data_cleanup(), HIGH_WATER_MARK, sip_to_pjsip::info(), LOW_WATER_MARK, NULL, RAII_VAR, task(), task_data_create(), task_wait(), TEST_DATA_ARRAY_SIZE, TEST_EXECUTE, TEST_INIT, TPS_REF_DEFAULT, and task_data::wait_time.

181 {
183 #define TEST_DATA_ARRAY_SIZE 10
184 #define LOW_WATER_MARK 3
185 #define HIGH_WATER_MARK 6
186  struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
187  int res = 0;
188  int i;
189  long queue_count;
190  unsigned int alert_level;
191  unsigned int subsystem_alert_level;
192 
193  switch (cmd) {
194  case TEST_INIT:
195  info->name = "subsystem_alert";
196  info->category = "/main/taskprocessor/";
197  info->summary = "Test of subsystem alerts";
198  info->description =
199  "Ensures alerts are generated properly.";
200  return AST_TEST_NOT_RUN;
201  case TEST_EXECUTE:
202  break;
203  }
204 
205  tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
206 
207  if (!tps) {
208  ast_test_status_update(test, "Unable to create test taskprocessor\n");
209  return AST_TEST_FAIL;
210  }
211 
214 
215  for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
217  if (!task_data[i]) {
218  ast_test_status_update(test, "Unable to create task_data\n");
219  res = -1;
220  goto data_cleanup;
221  }
222  task_data[i]->wait_time = 500;
223 
224  ast_test_status_update(test, "Pushing task %d\n", i);
225  if (ast_taskprocessor_push(tps, task, task_data[i])) {
226  ast_test_status_update(test, "Failed to queue task\n");
227  res = -1;
228  goto data_cleanup;
229  }
230 
231  queue_count = ast_taskprocessor_size(tps);
232  alert_level = ast_taskprocessor_alert_get();
233  subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
234 
235  if (queue_count == HIGH_WATER_MARK) {
236  if (subsystem_alert_level) {
237  ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
238  }
239  if (alert_level) {
240  ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
241  }
242  } else if (queue_count < HIGH_WATER_MARK) {
243  if (subsystem_alert_level > 0) {
244  ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
245  res = -1;
246  }
247  if (alert_level > 0) {
248  ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
249  res = -1;
250  }
251  } else {
252  if (subsystem_alert_level == 0) {
253  ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
254  res = -1;
255  }
256  if (alert_level == 0) {
257  ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
258  res = -1;
259  }
260  }
261  }
262 
264 
265  for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
266  ast_test_status_update(test, "Waiting on task %d\n", i);
267  if (task_wait(task_data[i])) {
268  ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
269  res = -1;
270  goto data_cleanup;
271  }
272 
273  queue_count = ast_taskprocessor_size(tps);
274  alert_level = ast_taskprocessor_alert_get();
275  subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
276 
277  if (queue_count == LOW_WATER_MARK) {
278  if (!subsystem_alert_level) {
279  ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
280  }
281  if (!alert_level) {
282  ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
283  }
284  } else if (queue_count > LOW_WATER_MARK) {
285  if (subsystem_alert_level == 0) {
286  ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
287  res = -1;
288  }
289  if (alert_level == 0) {
290  ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
291  res = -1;
292  }
293  } else {
294  if (subsystem_alert_level > 0) {
295  ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
296  res = -1;
297  }
298  if (alert_level > 0) {
299  ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
300  res = -1;
301  }
302  }
303 
304  }
305 
307  for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
309  }
310 
311  return res ? AST_TEST_FAIL : AST_TEST_PASS;
312 }
unsigned long wait_time
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define TEST_DATA_ARRAY_SIZE
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
#define NULL
Definition: resample.c:96
static int task(void *data)
Queued task for baseline test.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
#define HIGH_WATER_MARK
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static struct task_data * task_data_create(void)
Create a task_data object.
def info(msg)
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by sybsystem.
userdata associated with baseline taskprocessor test
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
#define LOW_WATER_MARK
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
static void data_cleanup(void *data)

◆ AST_TEST_DEFINE() [3/7]

AST_TEST_DEFINE ( default_taskprocessor_load  )

Load test for taskprocessor with default listener.

This test queues a large number of tasks, each with random data associated. The test ensures that all of the tasks are run and that the tasks are executed in the same order that they were queued

Definition at line 352 of file test_taskprocessor.c.

References ast_cond_destroy, ast_cond_init, ast_cond_timedwait, ast_mutex_destroy, ast_mutex_init, ast_mutex_lock, ast_mutex_unlock, ast_random(), ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, ast_tvnow(), load_task_data::cond, sip_to_pjsip::info(), load_task(), load_task_results, load_task_data::lock, NULL, NUM_TASKS, load_task_data::task_rand, load_task_data::tasks_completed, TEST_EXECUTE, TEST_INIT, and TPS_REF_DEFAULT.

353 {
354  struct ast_taskprocessor *tps;
355  struct timeval start;
356  struct timespec ts;
358  int timedwait_res;
359  int i;
360  int rand_data[NUM_TASKS];
361 
362  switch (cmd) {
363  case TEST_INIT:
364  info->name = "default_taskprocessor_load";
365  info->category = "/main/taskprocessor/";
366  info->summary = "Load test of default taskproccesor";
367  info->description =
368  "Ensure that a large number of queued tasks are executed in the proper order.";
369  return AST_TEST_NOT_RUN;
370  case TEST_EXECUTE:
371  break;
372  }
373 
374  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
375 
376  if (!tps) {
377  ast_test_status_update(test, "Unable to create test taskprocessor\n");
378  return AST_TEST_FAIL;
379  }
380 
381  start = ast_tvnow();
382 
383  ts.tv_sec = start.tv_sec + 60;
384  ts.tv_nsec = start.tv_usec * 1000;
385 
389 
390  for (i = 0; i < NUM_TASKS; ++i) {
391  rand_data[i] = ast_random();
392  if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
393  ast_test_status_update(test, "Failed to queue task\n");
394  res = AST_TEST_FAIL;
395  goto test_end;
396  }
397  }
398 
400  while (load_task_results.tasks_completed < NUM_TASKS) {
402  if (timedwait_res == ETIMEDOUT) {
403  break;
404  }
405  }
407 
408  if (load_task_results.tasks_completed != NUM_TASKS) {
409  ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
411  res = AST_TEST_FAIL;
412  goto test_end;
413  }
414 
415  for (i = 0; i < NUM_TASKS; ++i) {
416  if (rand_data[i] != load_task_results.task_rand[i]) {
417  ast_test_status_update(test, "Queued tasks did not execute in order\n");
418  res = AST_TEST_FAIL;
419  goto test_end;
420  }
421  }
422 
423 test_end:
427  return res;
428 }
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
#define ast_cond_init(cond, attr)
Definition: lock.h:199
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_mutex_lock(a)
Definition: lock.h:187
#define NULL
Definition: resample.c:96
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
long int ast_random(void)
Definition: main/utils.c:2064
int task_rand[NUM_TASKS]
def info(msg)
#define ast_cond_destroy(cond)
Definition: lock.h:200
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
#define NUM_TASKS
#define ast_mutex_init(pmutex)
Definition: lock.h:184
#define ast_mutex_destroy(a)
Definition: lock.h:186
static struct load_task_data load_task_results
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
ast_test_result_state
Definition: test.h:200
#define ast_mutex_unlock(a)
Definition: lock.h:188

◆ AST_TEST_DEFINE() [4/7]

AST_TEST_DEFINE ( taskprocessor_listener  )

Test for a taskprocessor with custom listener.

This test pushes tasks to a taskprocessor with a custom listener, executes the taskss, and destroys the taskprocessor.

The test ensures that the listener's callbacks are called when expected and that the data being passed in is accurate.

Definition at line 555 of file test_taskprocessor.c.

References ao2_cleanup, ast_free, ast_taskprocessor_create_with_listener(), ast_taskprocessor_execute(), ast_taskprocessor_listener_alloc(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, check_stats(), sip_to_pjsip::info(), listener(), listener_test_task(), NULL, test_listener_pvt::shutdown, TEST_EXECUTE, TEST_INIT, and test_listener_pvt_alloc().

556 {
557  struct ast_taskprocessor *tps = NULL;
559  struct test_listener_pvt *pvt = NULL;
561 
562  switch (cmd) {
563  case TEST_INIT:
564  info->name = "taskprocessor_listener";
565  info->category = "/main/taskprocessor/";
566  info->summary = "Test of taskproccesor listeners";
567  info->description =
568  "Ensures that listener callbacks are called when expected.";
569  return AST_TEST_NOT_RUN;
570  case TEST_EXECUTE:
571  break;
572  }
573 
574  pvt = test_listener_pvt_alloc();
575  if (!pvt) {
576  ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
577  return AST_TEST_FAIL;
578  }
579 
581  if (!listener) {
582  ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
583  res = AST_TEST_FAIL;
584  goto test_exit;
585  }
586 
587  tps = ast_taskprocessor_create_with_listener("test_listener", listener);
588  if (!tps) {
589  ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
590  res = AST_TEST_FAIL;
591  goto test_exit;
592  }
593 
595  ast_test_status_update(test, "Failed to queue task\n");
596  res = AST_TEST_FAIL;
597  goto test_exit;
598  }
599 
600  if (check_stats(test, pvt, 1, 0, 1) < 0) {
601  res = AST_TEST_FAIL;
602  goto test_exit;
603  }
604 
606  ast_test_status_update(test, "Failed to queue task\n");
607  res = AST_TEST_FAIL;
608  goto test_exit;
609  }
610 
611  if (check_stats(test, pvt, 2, 0, 1) < 0) {
612  res = AST_TEST_FAIL;
613  goto test_exit;
614  }
615 
617 
618  if (check_stats(test, pvt, 2, 0, 1) < 0) {
619  res = AST_TEST_FAIL;
620  goto test_exit;
621  }
622 
624 
625  if (check_stats(test, pvt, 2, 1, 1) < 0) {
626  res = AST_TEST_FAIL;
627  goto test_exit;
628  }
629 
631 
632  if (!pvt->shutdown) {
633  res = AST_TEST_FAIL;
634  goto test_exit;
635  }
636 
637 test_exit:
638  ao2_cleanup(listener);
639  /* This is safe even if tps is NULL */
641  ast_free(pvt);
642  return res;
643 }
A listener for taskprocessors.
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
static const struct ast_taskprocessor_listener_callbacks test_callbacks
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define NULL
Definition: resample.c:96
static void * listener(void *unused)
Definition: asterisk.c:1476
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
def info(msg)
#define ast_free(a)
Definition: astmm.h:182
static void * test_listener_pvt_alloc(void)
test taskprocessor listener&#39;s alloc callback
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
Private data for the test taskprocessor listener.
ast_test_result_state
Definition: test.h:200

◆ AST_TEST_DEFINE() [5/7]

AST_TEST_DEFINE ( taskprocessor_shutdown  )

Definition at line 749 of file test_taskprocessor.c.

References ao2_cleanup, ast_pthread_create, ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, sip_to_pjsip::info(), NULL, RAII_VAR, shutdown_data_create(), shutdown_has_completed(), shutdown_poke(), shutdown_task_exec(), shutdown_waitfor_completion(), shutdown_waitfor_start(), TEST_EXECUTE, TEST_INIT, TPS_REF_DEFAULT, and tps_shutdown_thread().

750 {
752  RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
753  RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
754  int push_res;
755  int wait_res;
756  int pthread_res;
757  pthread_t shutdown_thread;
758 
759  switch (cmd) {
760  case TEST_INIT:
761  info->name = "taskprocessor_shutdown";
762  info->category = "/main/taskprocessor/";
763  info->summary = "Test of taskproccesor shutdown sequence";
764  info->description =
765  "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
766  return AST_TEST_NOT_RUN;
767  case TEST_EXECUTE:
768  break;
769  }
770 
771  tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
772  task1 = shutdown_data_create(0); /* task1 waits to be poked */
773  task2 = shutdown_data_create(1); /* task2 waits for nothing */
774 
775  if (!tps || !task1 || !task2) {
776  ast_test_status_update(test, "Allocation error\n");
777  return AST_TEST_FAIL;
778  }
779 
780  push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
781  if (push_res != 0) {
782  ast_test_status_update(test, "Could not push task1\n");
783  return AST_TEST_FAIL;
784  }
785 
786  push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
787  if (push_res != 0) {
788  ast_test_status_update(test, "Could not push task2\n");
789  return AST_TEST_FAIL;
790  }
791 
792  wait_res = shutdown_waitfor_start(task1);
793  if (!wait_res) {
794  ast_test_status_update(test, "Task1 didn't start\n");
795  return AST_TEST_FAIL;
796  }
797 
798  pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
799  if (pthread_res != 0) {
800  ast_test_status_update(test, "Failed to create shutdown thread\n");
801  return AST_TEST_FAIL;
802  }
803  tps = NULL;
804 
805  /* Wakeup task1; it should complete */
806  shutdown_poke(task1);
807  wait_res = shutdown_waitfor_completion(task1);
808  if (!wait_res) {
809  ast_test_status_update(test, "Task1 didn't complete\n");
810  return AST_TEST_FAIL;
811  }
812 
813  /* Wait for shutdown to complete */
814  pthread_join(shutdown_thread, NULL);
815 
816  /* Should have also completed task2 */
817  wait_res = shutdown_has_completed(task2);
818  if (!wait_res) {
819  ast_test_status_update(test, "Task2 didn't finish\n");
820  return AST_TEST_FAIL;
821  }
822 
823  return AST_TEST_PASS;
824 }
static void shutdown_poke(struct shutdown_data *shutdown_data)
static int shutdown_task_exec(void *data)
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
#define NULL
Definition: resample.c:96
static void * tps_shutdown_thread(void *data)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
static struct shutdown_data * shutdown_data_create(int dont_wait)
def info(msg)
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:559
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

◆ AST_TEST_DEFINE() [6/7]

AST_TEST_DEFINE ( taskprocessor_push_local  )

Definition at line 837 of file test_taskprocessor.c.

References ao2_cleanup, ast_taskprocessor_get(), ast_taskprocessor_push_local(), ast_taskprocessor_set_local(), ast_taskprocessor_unreference(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, sip_to_pjsip::info(), local_task_exe(), NULL, RAII_VAR, task_data_create(), task_wait(), TEST_EXECUTE, TEST_INIT, and TPS_REF_DEFAULT.

838 {
839  RAII_VAR(struct ast_taskprocessor *, tps, NULL,
842  int local_data;
843  int res;
844 
845  switch (cmd) {
846  case TEST_INIT:
847  info->name = __func__;
848  info->category = "/main/taskprocessor/";
849  info->summary = "Test of pushing local data";
850  info->description =
851  "Ensures that local data is passed along.";
852  return AST_TEST_NOT_RUN;
853  case TEST_EXECUTE:
854  break;
855  }
856 
857 
858  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
859  if (!tps) {
860  ast_test_status_update(test, "Unable to create test taskprocessor\n");
861  return AST_TEST_FAIL;
862  }
863 
864 
866  if (!task_data) {
867  ast_test_status_update(test, "Unable to create task_data\n");
868  return AST_TEST_FAIL;
869  }
870 
871  local_data = 0;
872  ast_taskprocessor_set_local(tps, &local_data);
873 
875  ast_test_status_update(test, "Failed to queue task\n");
876  return AST_TEST_FAIL;
877  }
878 
879  res = task_wait(task_data);
880  if (res != 0) {
881  ast_test_status_update(test, "Queued task did not execute!\n");
882  return AST_TEST_FAIL;
883  }
884 
885  if (local_data != 1) {
887  "Queued task did not set local_data!\n");
888  return AST_TEST_FAIL;
889  }
890 
891  return AST_TEST_PASS;
892 }
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
static int local_task_exe(struct ast_taskprocessor_local *local)
#define NULL
Definition: resample.c:96
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
static struct task_data * task_data_create(void)
Create a task_data object.
def info(msg)
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
userdata associated with baseline taskprocessor test
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

◆ AST_TEST_DEFINE() [7/7]

AST_TEST_DEFINE ( serializer_pool  )

Baseline test for a serializer pool.

This test ensures that when a task is added to a taskprocessor that has been allocated with a default listener that the task gets executed as expected

Definition at line 901 of file test_taskprocessor.c.

References ao2_cleanup, ast_serializer_pool_create(), ast_serializer_pool_destroy(), ast_serializer_pool_get(), ast_serializer_pool_name(), ast_serializer_pool_set_alerts(), ast_taskprocessor_push(), AST_TEST_FAIL, AST_TEST_NOT_RUN, AST_TEST_PASS, ast_test_status_update, ast_threadpool_create(), AST_THREADPOOL_OPTIONS_VERSION, ast_threadpool_shutdown(), sip_to_pjsip::info(), NULL, RAII_VAR, task(), task_data_create(), task_wait(), TEST_EXECUTE, TEST_INIT, threadpool, ast_threadpool_options::version, and task_data::wait_time.

902 {
904  RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
908  .idle_timeout = 0,
909  .auto_increment = 0,
910  .initial_size = 1,
911  .max_size = 0,
912  };
913  /* struct ast_taskprocessor *tps; */
914 
915  switch (cmd) {
916  case TEST_INIT:
917  info->name = "serializer_pool";
918  info->category = "/main/taskprocessor/";
919  info->summary = "Test using a serializer pool";
920  info->description =
921  "Ensures that a queued task gets executed.";
922  return AST_TEST_NOT_RUN;
923  case TEST_EXECUTE:
924  break;
925  }
926 
927  ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
928  ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
929  "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
930  ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
931  ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
932  ast_test_validate(test, task_data = task_data_create());
933 
934  task_data->wait_time = 4000; /* task takes 4 seconds */
935  ast_test_validate(test, !ast_taskprocessor_push(
936  ast_serializer_pool_get(serializer_pool), task, task_data));
937 
938  if (!ast_serializer_pool_destroy(serializer_pool)) {
939  ast_test_status_update(test, "Unexpected pool destruction!\n");
940  /*
941  * The pool should have timed out, so if it destruction reports success
942  * we need to fail.
943  */
944  serializer_pool = NULL;
945  return AST_TEST_FAIL;
946  }
947 
948  ast_test_validate(test, !task_wait(task_data));
949 
950  /* The first attempt should have failed. Second try should destroy successfully */
951  if (ast_serializer_pool_destroy(serializer_pool)) {
952  ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
953  /*
954  * If this fails we'll try again on return to hopefully avoid a memory leak.
955  * If it again times out a third time, well not much we can do.
956  */
957  return AST_TEST_FAIL;
958  }
959 
960  /* Test passed, so set pool to NULL to avoid "re-running" destroy */
961  serializer_pool = NULL;
962 
963  return AST_TEST_PASS;
964 }
unsigned long wait_time
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
#define NULL
Definition: resample.c:96
static int task(void *data)
Queued task for baseline test.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
static struct task_data * task_data_create(void)
Create a task_data object.
def info(msg)
userdata associated with baseline taskprocessor test
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:965
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition: serializer.c:156
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition: serializer.c:76
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition: serializer.c:122
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
An opaque threadpool structure.
Definition: threadpool.c:36
static struct test_options options
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition: serializer.c:127

◆ check_stats()

static int check_stats ( struct ast_test test,
const struct test_listener_pvt pvt,
int  num_pushed,
int  num_emptied,
int  num_was_empty 
)
static

helper to ensure that statistics the listener is keeping are what we expect

Parameters
testThe currently-running test
pvtThe private data for the taskprocessor listener
num_pushedThe expected current number of tasks pushed to the processor
num_emptiedThe expected current number of times the taskprocessor has become empty
num_was_emptyThe expected current number of times that tasks were pushed to an empty taskprocessor
Return values
-1Stats were not as expected
0Stats were as expected

Definition at line 523 of file test_taskprocessor.c.

References ast_test_status_update, test_listener_pvt::num_emptied, test_listener_pvt::num_pushed, and test_listener_pvt::num_was_empty.

Referenced by AST_TEST_DEFINE().

524 {
525  if (pvt->num_pushed != num_pushed) {
526  ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
527  num_pushed, pvt->num_pushed);
528  return -1;
529  }
530 
531  if (pvt->num_emptied != num_emptied) {
532  ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
533  num_emptied, pvt->num_emptied);
534  return -1;
535  }
536 
537  if (pvt->num_was_empty != num_was_empty) {
538  ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
539  num_was_empty, pvt->num_emptied);
540  return -1;
541  }
542 
543  return 0;
544 }
#define ast_test_status_update(a, b, c...)
Definition: test.h:129

◆ listener_test_task()

static int listener_test_task ( void *  ignore)
static

Queued task for taskprocessor listener test.

Does nothing.

Definition at line 507 of file test_taskprocessor.c.

Referenced by AST_TEST_DEFINE().

508 {
509  return 0;
510 }

◆ load_module()

static int load_module ( void  )
static

Definition at line 978 of file test_taskprocessor.c.

References AST_MODULE_LOAD_SUCCESS, and ast_test_register().

979 {
980  ast_test_register(default_taskprocessor);
981  ast_test_register(default_taskprocessor_load);
983  ast_test_register(taskprocessor_listener);
984  ast_test_register(taskprocessor_shutdown);
985  ast_test_register(taskprocessor_push_local);
986  ast_test_register(serializer_pool);
988 }
int ast_test_register(ast_test_cb_t *cb)
Definition: test.c:194

◆ load_task()

static int load_task ( void *  data)
static

a queued task to be used in the taskprocessor load test

The task increments the number of tasks executed and puts the passed-in data into the next slot in the array of random data.

Definition at line 336 of file test_taskprocessor.c.

References ast_cond_signal, load_task_data::cond, load_task_results, task_data::lock, load_task_data::lock, SCOPED_MUTEX, load_task_data::task_rand, and load_task_data::tasks_completed.

Referenced by AST_TEST_DEFINE().

337 {
338  int *randdata = data;
342  return 0;
343 }
#define ast_cond_signal(cond)
Definition: lock.h:201
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091
int task_rand[NUM_TASKS]
static struct load_task_data load_task_results

◆ local_task_exe()

static int local_task_exe ( struct ast_taskprocessor_local local)
static

Definition at line 826 of file test_taskprocessor.c.

References ast_taskprocessor_local::data, ast_taskprocessor::local_data, ast_taskprocessor_local::local_data, and task().

Referenced by AST_TEST_DEFINE().

827 {
828  int *local_data = local->local_data;
829  struct task_data *task_data = local->data;
830 
831  *local_data = 1;
832  task(task_data);
833 
834  return 0;
835 }
static int task(void *data)
Queued task for baseline test.
userdata associated with baseline taskprocessor test

◆ shutdown_data_create()

static struct shutdown_data* shutdown_data_create ( int  dont_wait)
static

Definition at line 662 of file test_taskprocessor.c.

References ao2_alloc, ao2_cleanup, ao2_ref, ast_cond_init, ast_mutex_init, shutdown_data::in, shutdown_data::lock, NULL, shutdown_data::out, RAII_VAR, shutdown_data_dtor(), and shutdown_data::task_stop_waiting.

Referenced by AST_TEST_DEFINE().

663 {
665 
667  if (!shutdown_data) {
668  return NULL;
669  }
670 
674  shutdown_data->task_stop_waiting = dont_wait;
675  ao2_ref(shutdown_data, +1);
676  return shutdown_data;
677 }
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define NULL
Definition: resample.c:96
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static void shutdown_data_dtor(void *data)
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
#define ast_mutex_init(pmutex)
Definition: lock.h:184

◆ shutdown_data_dtor()

static void shutdown_data_dtor ( void *  data)
static

Definition at line 654 of file test_taskprocessor.c.

References ast_cond_destroy, ast_mutex_destroy, shutdown_data::in, shutdown_data::lock, and shutdown_data::out.

Referenced by shutdown_data_create().

655 {
656  struct shutdown_data *shutdown_data = data;
657  ast_mutex_destroy(&shutdown_data->lock);
658  ast_cond_destroy(&shutdown_data->in);
659  ast_cond_destroy(&shutdown_data->out);
660 }
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ast_mutex_destroy(a)
Definition: lock.h:186

◆ shutdown_has_completed()

static int shutdown_has_completed ( struct shutdown_data shutdown_data)
static

Definition at line 711 of file test_taskprocessor.c.

References task_data::lock, shutdown_data::lock, SCOPED_MUTEX, and shutdown_data::task_complete.

Referenced by AST_TEST_DEFINE().

712 {
713  SCOPED_MUTEX(lock, &shutdown_data->lock);
714  return shutdown_data->task_complete;
715 }
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091

◆ shutdown_poke()

static void shutdown_poke ( struct shutdown_data shutdown_data)
static

Definition at line 735 of file test_taskprocessor.c.

References ast_cond_signal, shutdown_data::in, task_data::lock, shutdown_data::lock, SCOPED_MUTEX, and shutdown_data::task_stop_waiting.

Referenced by AST_TEST_DEFINE().

736 {
737  SCOPED_MUTEX(lock, &shutdown_data->lock);
738  shutdown_data->task_stop_waiting = 1;
739  ast_cond_signal(&shutdown_data->in);
740 }
#define ast_cond_signal(cond)
Definition: lock.h:201
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091

◆ shutdown_task_exec()

static int shutdown_task_exec ( void *  data)
static

Definition at line 679 of file test_taskprocessor.c.

References ast_cond_signal, ast_cond_wait, shutdown_data::in, task_data::lock, shutdown_data::lock, shutdown_data::out, SCOPED_MUTEX, shutdown_data::task_complete, shutdown_data::task_started, and shutdown_data::task_stop_waiting.

Referenced by AST_TEST_DEFINE().

680 {
681  struct shutdown_data *shutdown_data = data;
682  SCOPED_MUTEX(lock, &shutdown_data->lock);
683  shutdown_data->task_started = 1;
684  ast_cond_signal(&shutdown_data->out);
685  while (!shutdown_data->task_stop_waiting) {
686  ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
687  }
688  shutdown_data->task_complete = 1;
689  ast_cond_signal(&shutdown_data->out);
690  return 0;
691 }
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_signal(cond)
Definition: lock.h:201
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091

◆ shutdown_waitfor_completion()

static int shutdown_waitfor_completion ( struct shutdown_data shutdown_data)
static

Definition at line 693 of file test_taskprocessor.c.

References ast_cond_timedwait, ast_tvnow(), task_data::lock, shutdown_data::lock, shutdown_data::out, SCOPED_MUTEX, and shutdown_data::task_complete.

Referenced by AST_TEST_DEFINE().

694 {
695  struct timeval start = ast_tvnow();
696  struct timespec end = {
697  .tv_sec = start.tv_sec + 5,
698  .tv_nsec = start.tv_usec * 1000
699  };
700  SCOPED_MUTEX(lock, &shutdown_data->lock);
701 
702  while (!shutdown_data->task_complete) {
703  if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
704  break;
705  }
706  }
707 
708  return shutdown_data->task_complete;
709 }
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
char * end
Definition: eagi_proxy.c:73
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204

◆ shutdown_waitfor_start()

static int shutdown_waitfor_start ( struct shutdown_data shutdown_data)
static

Definition at line 717 of file test_taskprocessor.c.

References ast_cond_timedwait, ast_tvnow(), task_data::lock, shutdown_data::lock, shutdown_data::out, SCOPED_MUTEX, and shutdown_data::task_started.

Referenced by AST_TEST_DEFINE().

718 {
719  struct timeval start = ast_tvnow();
720  struct timespec end = {
721  .tv_sec = start.tv_sec + 5,
722  .tv_nsec = start.tv_usec * 1000
723  };
724  SCOPED_MUTEX(lock, &shutdown_data->lock);
725 
726  while (!shutdown_data->task_started) {
727  if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
728  break;
729  }
730  }
731 
732  return shutdown_data->task_started;
733 }
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
char * end
Definition: eagi_proxy.c:73
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204

◆ task()

static int task ( void *  data)
static

Queued task for baseline test.

The task simply sets a boolean to indicate the task has been run and then signals a condition saying it's complete

Definition at line 88 of file test_taskprocessor.c.

References ast_cond_signal, task_data::cond, task_data::lock, SCOPED_MUTEX, task_data::task_complete, and task_data::wait_time.

Referenced by AST_TEST_DEFINE(), ast_threadpool_push(), local_task_exe(), tps_taskprocessor_dtor(), and tps_taskprocessor_pop().

89 {
90  struct task_data *task_data = data;
91 
92  SCOPED_MUTEX(lock, &task_data->lock);
93  if (task_data->wait_time > 0) {
94  usleep(task_data->wait_time * 1000);
95  }
96  task_data->task_complete = 1;
97  ast_cond_signal(&task_data->cond);
98  return 0;
99 }
unsigned long wait_time
ast_mutex_t lock
#define ast_cond_signal(cond)
Definition: lock.h:201
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091
ast_cond_t cond
userdata associated with baseline taskprocessor test

◆ task_data_create()

static struct task_data* task_data_create ( void  )
static

Create a task_data object.

Definition at line 64 of file test_taskprocessor.c.

References ao2_alloc, ast_cond_init, ast_mutex_init, task_data::cond, task_data::lock, NULL, task_data::task_complete, task_data_dtor(), and task_data::wait_time.

Referenced by AST_TEST_DEFINE().

65 {
66  struct task_data *task_data =
67  ao2_alloc(sizeof(*task_data), task_data_dtor);
68 
69  if (!task_data) {
70  return NULL;
71  }
72 
73  ast_cond_init(&task_data->cond, NULL);
74  ast_mutex_init(&task_data->lock);
75  task_data->task_complete = 0;
76  task_data->wait_time = 0;
77 
78  return task_data;
79 }
unsigned long wait_time
ast_mutex_t lock
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define NULL
Definition: resample.c:96
ast_cond_t cond
static void task_data_dtor(void *obj)
userdata associated with baseline taskprocessor test
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
#define ast_mutex_init(pmutex)
Definition: lock.h:184

◆ task_data_dtor()

static void task_data_dtor ( void *  obj)
static

Definition at line 55 of file test_taskprocessor.c.

References ast_cond_destroy, ast_mutex_destroy, task_data::cond, and task_data::lock.

Referenced by task_data_create().

56 {
57  struct task_data *task_data = obj;
58 
59  ast_mutex_destroy(&task_data->lock);
60  ast_cond_destroy(&task_data->cond);
61 }
ast_mutex_t lock
ast_cond_t cond
userdata associated with baseline taskprocessor test
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ast_mutex_destroy(a)
Definition: lock.h:186

◆ task_wait()

static int task_wait ( struct task_data task_data)
static

Wait for a task to execute.

Definition at line 104 of file test_taskprocessor.c.

References ast_cond_timedwait, ast_tvnow(), task_data::cond, task_data::lock, SCOPED_MUTEX, and task_data::task_complete.

Referenced by AST_TEST_DEFINE().

105 {
106  struct timeval start = ast_tvnow();
107  struct timespec end;
108  SCOPED_MUTEX(lock, &task_data->lock);
109 
110  end.tv_sec = start.tv_sec + 30;
111  end.tv_nsec = start.tv_usec * 1000;
112 
113  while (!task_data->task_complete) {
114  int res;
115  res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
116  &end);
117  if (res == ETIMEDOUT) {
118  return -1;
119  }
120  }
121 
122  return 0;
123 }
ast_mutex_t lock
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
char * end
Definition: eagi_proxy.c:73
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
ast_mutex_t lock
Definition: app_meetme.c:1091
ast_cond_t cond
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204

◆ test_emptied()

static void test_emptied ( struct ast_taskprocessor_listener listener)
static

test taskprocessor listener's emptied callback.

Definition at line 480 of file test_taskprocessor.c.

References ast_taskprocessor_listener_get_user_data(), and test_listener_pvt::num_emptied.

481 {
483  ++pvt->num_emptied;
484 }
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
Private data for the test taskprocessor listener.

◆ test_listener_pvt_alloc()

static void* test_listener_pvt_alloc ( void  )
static

test taskprocessor listener's alloc callback

Definition at line 447 of file test_taskprocessor.c.

References ast_calloc.

Referenced by AST_TEST_DEFINE().

448 {
449  struct test_listener_pvt *pvt;
450 
451  pvt = ast_calloc(1, sizeof(*pvt));
452  return pvt;
453 }
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
Private data for the test taskprocessor listener.

◆ test_shutdown()

static void test_shutdown ( struct ast_taskprocessor_listener listener)
static

test taskprocessor listener's shutdown callback.

Definition at line 489 of file test_taskprocessor.c.

References ast_taskprocessor_listener_get_user_data(), and test_listener_pvt::shutdown.

490 {
492  pvt->shutdown = 1;
493 }
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
Private data for the test taskprocessor listener.

◆ test_start()

static int test_start ( struct ast_taskprocessor_listener listener)
static

test taskprocessor listener's start callback

Definition at line 458 of file test_taskprocessor.c.

459 {
460  return 0;
461 }

◆ test_task_pushed()

static void test_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

test taskprocessor listener's task_pushed callback

Adjusts private data's stats as indicated by the parameters.

Definition at line 468 of file test_taskprocessor.c.

References ast_taskprocessor_listener_get_user_data(), test_listener_pvt::num_pushed, and test_listener_pvt::num_was_empty.

469 {
471  ++pvt->num_pushed;
472  if (was_empty) {
473  ++pvt->num_was_empty;
474  }
475 }
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
Private data for the test taskprocessor listener.

◆ tps_shutdown_thread()

static void* tps_shutdown_thread ( void *  data)
static

Definition at line 742 of file test_taskprocessor.c.

References ast_taskprocessor_unreference(), and NULL.

Referenced by AST_TEST_DEFINE().

743 {
744  struct ast_taskprocessor *tps = data;
746  return NULL;
747 }
#define NULL
Definition: resample.c:96
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

◆ unload_module()

static int unload_module ( void  )
static

Definition at line 966 of file test_taskprocessor.c.

References ast_test_unregister().

967 {
968  ast_test_unregister(default_taskprocessor);
969  ast_test_unregister(default_taskprocessor_load);
971  ast_test_unregister(taskprocessor_listener);
972  ast_test_unregister(taskprocessor_shutdown);
973  ast_test_unregister(taskprocessor_push_local);
974  ast_test_unregister(serializer_pool);
975  return 0;
976 }
int ast_test_unregister(ast_test_cb_t *cb)
Definition: test.c:218

Variable Documentation

◆ __mod_info

struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "taskprocessor test module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, .support_level = AST_MODULE_SUPPORT_CORE, }
static

Definition at line 990 of file test_taskprocessor.c.

◆ ast_module_info

const struct ast_module_info* ast_module_info = &__mod_info
static

Definition at line 990 of file test_taskprocessor.c.

◆ load_task_results

struct load_task_data load_task_results
static

Referenced by AST_TEST_DEFINE(), and load_task().

◆ test_callbacks

const struct ast_taskprocessor_listener_callbacks test_callbacks
static

Definition at line 495 of file test_taskprocessor.c.