Asterisk - The Open Source Telephony Project  18.5.0
test_taskprocessor.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \brief taskprocessor unit tests
22  *
23  * \author Mark Michelson <[email protected]>
24  *
25  */
26 
27 /*** MODULEINFO
28  <depend>TEST_FRAMEWORK</depend>
29  <support_level>core</support_level>
30  ***/
31 
32 #include "asterisk.h"
33 
34 #include "asterisk/test.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/module.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/serializer.h"
39 #include "asterisk/threadpool.h"
40 
41 /*!
42  * \brief userdata associated with baseline taskprocessor test
43  */
44 struct task_data {
45  /* Condition used to signal to queuing thread that task was executed */
47  /* Lock protecting the condition */
49  /*! Boolean indicating that the task was run */
51  /*! Milliseconds to wait before returning */
52  unsigned long wait_time;
53 };
54 
55 static void task_data_dtor(void *obj)
56 {
57  struct task_data *task_data = obj;
58 
59  ast_mutex_destroy(&task_data->lock);
60  ast_cond_destroy(&task_data->cond);
61 }
62 
63 /*! \brief Create a task_data object */
64 static struct task_data *task_data_create(void)
65 {
66  struct task_data *task_data =
67  ao2_alloc(sizeof(*task_data), task_data_dtor);
68 
69  if (!task_data) {
70  return NULL;
71  }
72 
73  ast_cond_init(&task_data->cond, NULL);
74  ast_mutex_init(&task_data->lock);
75  task_data->task_complete = 0;
76  task_data->wait_time = 0;
77 
78  return task_data;
79 }
80 
81 /*!
82  * \brief Queued task for baseline test.
83  *
84  * The task simply sets a boolean to indicate the
85  * task has been run and then signals a condition
86  * saying it's complete
87  */
88 static int task(void *data)
89 {
90  struct task_data *task_data = data;
91 
92  SCOPED_MUTEX(lock, &task_data->lock);
93  if (task_data->wait_time > 0) {
94  usleep(task_data->wait_time * 1000);
95  }
96  task_data->task_complete = 1;
97  ast_cond_signal(&task_data->cond);
98  return 0;
99 }
100 
101 /*!
102  * \brief Wait for a task to execute.
103  */
104 static int task_wait(struct task_data *task_data)
105 {
106  struct timeval start = ast_tvnow();
107  struct timespec end;
108  SCOPED_MUTEX(lock, &task_data->lock);
109 
110  end.tv_sec = start.tv_sec + 30;
111  end.tv_nsec = start.tv_usec * 1000;
112 
113  while (!task_data->task_complete) {
114  int res;
115  res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
116  &end);
117  if (res == ETIMEDOUT) {
118  return -1;
119  }
120  }
121 
122  return 0;
123 }
124 
125 /*!
126  * \brief Baseline test for default taskprocessor
127  *
128  * This test ensures that when a task is added to a taskprocessor that
129  * has been allocated with a default listener that the task gets executed
130  * as expected
131  */
132 AST_TEST_DEFINE(default_taskprocessor)
133 {
136  int res;
137 
138  switch (cmd) {
139  case TEST_INIT:
140  info->name = "default_taskprocessor";
141  info->category = "/main/taskprocessor/";
142  info->summary = "Test of default taskproccesor";
143  info->description =
144  "Ensures that a queued task gets executed.";
145  return AST_TEST_NOT_RUN;
146  case TEST_EXECUTE:
147  break;
148  }
149 
150  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
151 
152  if (!tps) {
153  ast_test_status_update(test, "Unable to create test taskprocessor\n");
154  return AST_TEST_FAIL;
155  }
156 
158  if (!task_data) {
159  ast_test_status_update(test, "Unable to create task_data\n");
160  return AST_TEST_FAIL;
161  }
162 
164  ast_test_status_update(test, "Failed to queue task\n");
165  return AST_TEST_FAIL;
166  }
167 
168  res = task_wait(task_data);
169  if (res != 0) {
170  ast_test_status_update(test, "Queued task did not execute!\n");
171  return AST_TEST_FAIL;
172  }
173 
174  return AST_TEST_PASS;
175 }
176 
177 /*!
178  * \brief Baseline test for subsystem alert
179  */
181 {
183 #define TEST_DATA_ARRAY_SIZE 10
184 #define LOW_WATER_MARK 3
185 #define HIGH_WATER_MARK 6
186  struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
187  int res = 0;
188  int i;
189  long queue_count;
190  unsigned int alert_level;
191  unsigned int subsystem_alert_level;
192 
193  switch (cmd) {
194  case TEST_INIT:
195  info->name = "subsystem_alert";
196  info->category = "/main/taskprocessor/";
197  info->summary = "Test of subsystem alerts";
198  info->description =
199  "Ensures alerts are generated properly.";
200  return AST_TEST_NOT_RUN;
201  case TEST_EXECUTE:
202  break;
203  }
204 
205  tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
206 
207  if (!tps) {
208  ast_test_status_update(test, "Unable to create test taskprocessor\n");
209  return AST_TEST_FAIL;
210  }
211 
214 
215  for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
217  if (!task_data[i]) {
218  ast_test_status_update(test, "Unable to create task_data\n");
219  res = -1;
220  goto data_cleanup;
221  }
222  task_data[i]->wait_time = 500;
223 
224  ast_test_status_update(test, "Pushing task %d\n", i);
225  if (ast_taskprocessor_push(tps, task, task_data[i])) {
226  ast_test_status_update(test, "Failed to queue task\n");
227  res = -1;
228  goto data_cleanup;
229  }
230 
231  queue_count = ast_taskprocessor_size(tps);
232  alert_level = ast_taskprocessor_alert_get();
233  subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
234 
235  if (queue_count == HIGH_WATER_MARK) {
236  if (subsystem_alert_level) {
237  ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
238  }
239  if (alert_level) {
240  ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
241  }
242  } else if (queue_count < HIGH_WATER_MARK) {
243  if (subsystem_alert_level > 0) {
244  ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
245  res = -1;
246  }
247  if (alert_level > 0) {
248  ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
249  res = -1;
250  }
251  } else {
252  if (subsystem_alert_level == 0) {
253  ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
254  res = -1;
255  }
256  if (alert_level == 0) {
257  ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
258  res = -1;
259  }
260  }
261  }
262 
264 
265  for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
266  ast_test_status_update(test, "Waiting on task %d\n", i);
267  if (task_wait(task_data[i])) {
268  ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
269  res = -1;
270  goto data_cleanup;
271  }
272 
273  queue_count = ast_taskprocessor_size(tps);
274  alert_level = ast_taskprocessor_alert_get();
275  subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
276 
277  if (queue_count == LOW_WATER_MARK) {
278  if (!subsystem_alert_level) {
279  ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
280  }
281  if (!alert_level) {
282  ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
283  }
284  } else if (queue_count > LOW_WATER_MARK) {
285  if (subsystem_alert_level == 0) {
286  ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
287  res = -1;
288  }
289  if (alert_level == 0) {
290  ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
291  res = -1;
292  }
293  } else {
294  if (subsystem_alert_level > 0) {
295  ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
296  res = -1;
297  }
298  if (alert_level > 0) {
299  ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
300  res = -1;
301  }
302  }
303 
304  }
305 
307  for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
309  }
310 
311  return res ? AST_TEST_FAIL : AST_TEST_PASS;
312 }
313 
314 #define NUM_TASKS 20000
315 
316 /*!
317  * \brief Relevant data associated with taskprocessor load test
318  */
319 static struct load_task_data {
320  /*! Condition used to indicate a task has completed executing */
322  /*! Lock used to protect the condition */
324  /*! Counter of the number of completed tasks */
326  /*! Storage for task-specific data */
327  int task_rand[NUM_TASKS];
329 
330 /*!
331  * \brief a queued task to be used in the taskprocessor load test
332  *
333  * The task increments the number of tasks executed and puts the passed-in
334  * data into the next slot in the array of random data.
335  */
336 static int load_task(void *data)
337 {
338  int *randdata = data;
342  return 0;
343 }
344 
345 /*!
346  * \brief Load test for taskprocessor with default listener
347  *
348  * This test queues a large number of tasks, each with random data associated.
349  * The test ensures that all of the tasks are run and that the tasks are executed
350  * in the same order that they were queued
351  */
352 AST_TEST_DEFINE(default_taskprocessor_load)
353 {
354  struct ast_taskprocessor *tps;
355  struct timeval start;
356  struct timespec ts;
358  int timedwait_res;
359  int i;
360  int rand_data[NUM_TASKS];
361 
362  switch (cmd) {
363  case TEST_INIT:
364  info->name = "default_taskprocessor_load";
365  info->category = "/main/taskprocessor/";
366  info->summary = "Load test of default taskproccesor";
367  info->description =
368  "Ensure that a large number of queued tasks are executed in the proper order.";
369  return AST_TEST_NOT_RUN;
370  case TEST_EXECUTE:
371  break;
372  }
373 
374  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
375 
376  if (!tps) {
377  ast_test_status_update(test, "Unable to create test taskprocessor\n");
378  return AST_TEST_FAIL;
379  }
380 
381  start = ast_tvnow();
382 
383  ts.tv_sec = start.tv_sec + 60;
384  ts.tv_nsec = start.tv_usec * 1000;
385 
389 
390  for (i = 0; i < NUM_TASKS; ++i) {
391  rand_data[i] = ast_random();
392  if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
393  ast_test_status_update(test, "Failed to queue task\n");
394  res = AST_TEST_FAIL;
395  goto test_end;
396  }
397  }
398 
400  while (load_task_results.tasks_completed < NUM_TASKS) {
402  if (timedwait_res == ETIMEDOUT) {
403  break;
404  }
405  }
407 
408  if (load_task_results.tasks_completed != NUM_TASKS) {
409  ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
411  res = AST_TEST_FAIL;
412  goto test_end;
413  }
414 
415  for (i = 0; i < NUM_TASKS; ++i) {
416  if (rand_data[i] != load_task_results.task_rand[i]) {
417  ast_test_status_update(test, "Queued tasks did not execute in order\n");
418  res = AST_TEST_FAIL;
419  goto test_end;
420  }
421  }
422 
423 test_end:
427  return res;
428 }
429 
430 /*!
431  * \brief Private data for the test taskprocessor listener
432  */
434  /* Counter of number of tasks pushed to the queue */
436  /* Counter of number of times the queue was emptied */
438  /* Counter of number of times that a pushed task occurred on an empty queue */
440  /* Boolean indicating whether the shutdown callback was called */
441  int shutdown;
442 };
443 
444 /*!
445  * \brief test taskprocessor listener's alloc callback
446  */
447 static void *test_listener_pvt_alloc(void)
448 {
449  struct test_listener_pvt *pvt;
450 
451  pvt = ast_calloc(1, sizeof(*pvt));
452  return pvt;
453 }
454 
455 /*!
456  * \brief test taskprocessor listener's start callback
457  */
459 {
460  return 0;
461 }
462 
463 /*!
464  * \brief test taskprocessor listener's task_pushed callback
465  *
466  * Adjusts private data's stats as indicated by the parameters.
467  */
468 static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
469 {
471  ++pvt->num_pushed;
472  if (was_empty) {
473  ++pvt->num_was_empty;
474  }
475 }
476 
477 /*!
478  * \brief test taskprocessor listener's emptied callback.
479  */
481 {
483  ++pvt->num_emptied;
484 }
485 
486 /*!
487  * \brief test taskprocessor listener's shutdown callback.
488  */
490 {
492  pvt->shutdown = 1;
493 }
494 
496  .start = test_start,
497  .task_pushed = test_task_pushed,
498  .emptied = test_emptied,
499  .shutdown = test_shutdown,
500 };
501 
502 /*!
503  * \brief Queued task for taskprocessor listener test.
504  *
505  * Does nothing.
506  */
507 static int listener_test_task(void *ignore)
508 {
509  return 0;
510 }
511 
512 /*!
513  * \brief helper to ensure that statistics the listener is keeping are what we expect
514  *
515  * \param test The currently-running test
516  * \param pvt The private data for the taskprocessor listener
517  * \param num_pushed The expected current number of tasks pushed to the processor
518  * \param num_emptied The expected current number of times the taskprocessor has become empty
519  * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
520  * \retval -1 Stats were not as expected
521  * \retval 0 Stats were as expected
522  */
523 static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
524 {
525  if (pvt->num_pushed != num_pushed) {
526  ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
527  num_pushed, pvt->num_pushed);
528  return -1;
529  }
530 
531  if (pvt->num_emptied != num_emptied) {
532  ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
533  num_emptied, pvt->num_emptied);
534  return -1;
535  }
536 
537  if (pvt->num_was_empty != num_was_empty) {
538  ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
539  num_was_empty, pvt->num_emptied);
540  return -1;
541  }
542 
543  return 0;
544 }
545 
546 /*!
547  * \brief Test for a taskprocessor with custom listener.
548  *
549  * This test pushes tasks to a taskprocessor with a custom listener, executes the taskss,
550  * and destroys the taskprocessor.
551  *
552  * The test ensures that the listener's callbacks are called when expected and that the data
553  * being passed in is accurate.
554  */
555 AST_TEST_DEFINE(taskprocessor_listener)
556 {
557  struct ast_taskprocessor *tps = NULL;
559  struct test_listener_pvt *pvt = NULL;
561 
562  switch (cmd) {
563  case TEST_INIT:
564  info->name = "taskprocessor_listener";
565  info->category = "/main/taskprocessor/";
566  info->summary = "Test of taskproccesor listeners";
567  info->description =
568  "Ensures that listener callbacks are called when expected.";
569  return AST_TEST_NOT_RUN;
570  case TEST_EXECUTE:
571  break;
572  }
573 
574  pvt = test_listener_pvt_alloc();
575  if (!pvt) {
576  ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
577  return AST_TEST_FAIL;
578  }
579 
580  listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
581  if (!listener) {
582  ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
583  res = AST_TEST_FAIL;
584  goto test_exit;
585  }
586 
587  tps = ast_taskprocessor_create_with_listener("test_listener", listener);
588  if (!tps) {
589  ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
590  res = AST_TEST_FAIL;
591  goto test_exit;
592  }
593 
595  ast_test_status_update(test, "Failed to queue task\n");
596  res = AST_TEST_FAIL;
597  goto test_exit;
598  }
599 
600  if (check_stats(test, pvt, 1, 0, 1) < 0) {
601  res = AST_TEST_FAIL;
602  goto test_exit;
603  }
604 
606  ast_test_status_update(test, "Failed to queue task\n");
607  res = AST_TEST_FAIL;
608  goto test_exit;
609  }
610 
611  if (check_stats(test, pvt, 2, 0, 1) < 0) {
612  res = AST_TEST_FAIL;
613  goto test_exit;
614  }
615 
617 
618  if (check_stats(test, pvt, 2, 0, 1) < 0) {
619  res = AST_TEST_FAIL;
620  goto test_exit;
621  }
622 
624 
625  if (check_stats(test, pvt, 2, 1, 1) < 0) {
626  res = AST_TEST_FAIL;
627  goto test_exit;
628  }
629 
631 
632  if (!pvt->shutdown) {
633  res = AST_TEST_FAIL;
634  goto test_exit;
635  }
636 
637 test_exit:
638  ao2_cleanup(listener);
639  /* This is safe even if tps is NULL */
641  ast_free(pvt);
642  return res;
643 }
644 
652 };
653 
654 static void shutdown_data_dtor(void *data)
655 {
656  struct shutdown_data *shutdown_data = data;
657  ast_mutex_destroy(&shutdown_data->lock);
658  ast_cond_destroy(&shutdown_data->in);
659  ast_cond_destroy(&shutdown_data->out);
660 }
661 
662 static struct shutdown_data *shutdown_data_create(int dont_wait)
663 {
665 
667  if (!shutdown_data) {
668  return NULL;
669  }
670 
674  shutdown_data->task_stop_waiting = dont_wait;
675  ao2_ref(shutdown_data, +1);
676  return shutdown_data;
677 }
678 
679 static int shutdown_task_exec(void *data)
680 {
681  struct shutdown_data *shutdown_data = data;
682  SCOPED_MUTEX(lock, &shutdown_data->lock);
683  shutdown_data->task_started = 1;
684  ast_cond_signal(&shutdown_data->out);
685  while (!shutdown_data->task_stop_waiting) {
686  ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
687  }
688  shutdown_data->task_complete = 1;
689  ast_cond_signal(&shutdown_data->out);
690  return 0;
691 }
692 
694 {
695  struct timeval start = ast_tvnow();
696  struct timespec end = {
697  .tv_sec = start.tv_sec + 5,
698  .tv_nsec = start.tv_usec * 1000
699  };
700  SCOPED_MUTEX(lock, &shutdown_data->lock);
701 
702  while (!shutdown_data->task_complete) {
703  if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
704  break;
705  }
706  }
707 
708  return shutdown_data->task_complete;
709 }
710 
712 {
713  SCOPED_MUTEX(lock, &shutdown_data->lock);
714  return shutdown_data->task_complete;
715 }
716 
718 {
719  struct timeval start = ast_tvnow();
720  struct timespec end = {
721  .tv_sec = start.tv_sec + 5,
722  .tv_nsec = start.tv_usec * 1000
723  };
724  SCOPED_MUTEX(lock, &shutdown_data->lock);
725 
726  while (!shutdown_data->task_started) {
727  if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
728  break;
729  }
730  }
731 
732  return shutdown_data->task_started;
733 }
734 
736 {
737  SCOPED_MUTEX(lock, &shutdown_data->lock);
738  shutdown_data->task_stop_waiting = 1;
739  ast_cond_signal(&shutdown_data->in);
740 }
741 
742 static void *tps_shutdown_thread(void *data)
743 {
744  struct ast_taskprocessor *tps = data;
746  return NULL;
747 }
748 
749 AST_TEST_DEFINE(taskprocessor_shutdown)
750 {
752  RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
753  RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
754  int push_res;
755  int wait_res;
756  int pthread_res;
757  pthread_t shutdown_thread;
758 
759  switch (cmd) {
760  case TEST_INIT:
761  info->name = "taskprocessor_shutdown";
762  info->category = "/main/taskprocessor/";
763  info->summary = "Test of taskproccesor shutdown sequence";
764  info->description =
765  "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
766  return AST_TEST_NOT_RUN;
767  case TEST_EXECUTE:
768  break;
769  }
770 
771  tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
772  task1 = shutdown_data_create(0); /* task1 waits to be poked */
773  task2 = shutdown_data_create(1); /* task2 waits for nothing */
774 
775  if (!tps || !task1 || !task2) {
776  ast_test_status_update(test, "Allocation error\n");
777  return AST_TEST_FAIL;
778  }
779 
780  push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
781  if (push_res != 0) {
782  ast_test_status_update(test, "Could not push task1\n");
783  return AST_TEST_FAIL;
784  }
785 
786  push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
787  if (push_res != 0) {
788  ast_test_status_update(test, "Could not push task2\n");
789  return AST_TEST_FAIL;
790  }
791 
792  wait_res = shutdown_waitfor_start(task1);
793  if (!wait_res) {
794  ast_test_status_update(test, "Task1 didn't start\n");
795  return AST_TEST_FAIL;
796  }
797 
798  pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
799  if (pthread_res != 0) {
800  ast_test_status_update(test, "Failed to create shutdown thread\n");
801  return AST_TEST_FAIL;
802  }
803  tps = NULL;
804 
805  /* Wakeup task1; it should complete */
806  shutdown_poke(task1);
807  wait_res = shutdown_waitfor_completion(task1);
808  if (!wait_res) {
809  ast_test_status_update(test, "Task1 didn't complete\n");
810  return AST_TEST_FAIL;
811  }
812 
813  /* Wait for shutdown to complete */
814  pthread_join(shutdown_thread, NULL);
815 
816  /* Should have also completed task2 */
817  wait_res = shutdown_has_completed(task2);
818  if (!wait_res) {
819  ast_test_status_update(test, "Task2 didn't finish\n");
820  return AST_TEST_FAIL;
821  }
822 
823  return AST_TEST_PASS;
824 }
825 
826 static int local_task_exe(struct ast_taskprocessor_local *local)
827 {
828  int *local_data = local->local_data;
829  struct task_data *task_data = local->data;
830 
831  *local_data = 1;
832  task(task_data);
833 
834  return 0;
835 }
836 
837 AST_TEST_DEFINE(taskprocessor_push_local)
838 {
839  RAII_VAR(struct ast_taskprocessor *, tps, NULL,
842  int local_data;
843  int res;
844 
845  switch (cmd) {
846  case TEST_INIT:
847  info->name = __func__;
848  info->category = "/main/taskprocessor/";
849  info->summary = "Test of pushing local data";
850  info->description =
851  "Ensures that local data is passed along.";
852  return AST_TEST_NOT_RUN;
853  case TEST_EXECUTE:
854  break;
855  }
856 
857 
858  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
859  if (!tps) {
860  ast_test_status_update(test, "Unable to create test taskprocessor\n");
861  return AST_TEST_FAIL;
862  }
863 
864 
866  if (!task_data) {
867  ast_test_status_update(test, "Unable to create task_data\n");
868  return AST_TEST_FAIL;
869  }
870 
871  local_data = 0;
872  ast_taskprocessor_set_local(tps, &local_data);
873 
875  ast_test_status_update(test, "Failed to queue task\n");
876  return AST_TEST_FAIL;
877  }
878 
879  res = task_wait(task_data);
880  if (res != 0) {
881  ast_test_status_update(test, "Queued task did not execute!\n");
882  return AST_TEST_FAIL;
883  }
884 
885  if (local_data != 1) {
887  "Queued task did not set local_data!\n");
888  return AST_TEST_FAIL;
889  }
890 
891  return AST_TEST_PASS;
892 }
893 
894 /*!
895  * \brief Baseline test for a serializer pool
896  *
897  * This test ensures that when a task is added to a taskprocessor that
898  * has been allocated with a default listener that the task gets executed
899  * as expected
900  */
901 AST_TEST_DEFINE(serializer_pool)
902 {
904  RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
906  struct ast_threadpool_options options = {
908  .idle_timeout = 0,
909  .auto_increment = 0,
910  .initial_size = 1,
911  .max_size = 0,
912  };
913  /* struct ast_taskprocessor *tps; */
914 
915  switch (cmd) {
916  case TEST_INIT:
917  info->name = "serializer_pool";
918  info->category = "/main/taskprocessor/";
919  info->summary = "Test using a serializer pool";
920  info->description =
921  "Ensures that a queued task gets executed.";
922  return AST_TEST_NOT_RUN;
923  case TEST_EXECUTE:
924  break;
925  }
926 
927  ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
928  ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
929  "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
930  ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
931  ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
932  ast_test_validate(test, task_data = task_data_create());
933 
934  task_data->wait_time = 4000; /* task takes 4 seconds */
935  ast_test_validate(test, !ast_taskprocessor_push(
936  ast_serializer_pool_get(serializer_pool), task, task_data));
937 
938  if (!ast_serializer_pool_destroy(serializer_pool)) {
939  ast_test_status_update(test, "Unexpected pool destruction!\n");
940  /*
941  * The pool should have timed out, so if it destruction reports success
942  * we need to fail.
943  */
944  serializer_pool = NULL;
945  return AST_TEST_FAIL;
946  }
947 
948  ast_test_validate(test, !task_wait(task_data));
949 
950  /* The first attempt should have failed. Second try should destroy successfully */
951  if (ast_serializer_pool_destroy(serializer_pool)) {
952  ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
953  /*
954  * If this fails we'll try again on return to hopefully avoid a memory leak.
955  * If it again times out a third time, well not much we can do.
956  */
957  return AST_TEST_FAIL;
958  }
959 
960  /* Test passed, so set pool to NULL to avoid "re-running" destroy */
961  serializer_pool = NULL;
962 
963  return AST_TEST_PASS;
964 }
965 
966 static int unload_module(void)
967 {
968  ast_test_unregister(default_taskprocessor);
969  ast_test_unregister(default_taskprocessor_load);
971  ast_test_unregister(taskprocessor_listener);
972  ast_test_unregister(taskprocessor_shutdown);
973  ast_test_unregister(taskprocessor_push_local);
974  ast_test_unregister(serializer_pool);
975  return 0;
976 }
977 
978 static int load_module(void)
979 {
980  ast_test_register(default_taskprocessor);
981  ast_test_register(default_taskprocessor_load);
983  ast_test_register(taskprocessor_listener);
984  ast_test_register(taskprocessor_shutdown);
985  ast_test_register(taskprocessor_push_local);
986  ast_test_register(serializer_pool);
988 }
989 
990 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");
unsigned long wait_time
ast_mutex_t lock
A listener for taskprocessors.
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition: module.h:567
Asterisk main include file. File version handling, generic pbx functions.
static void shutdown_poke(struct shutdown_data *shutdown_data)
int ast_test_unregister(ast_test_cb_t *cb)
Definition: test.c:218
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
static int shutdown_task_exec(void *data)
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener&#39;s emptied callback.
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
static const struct ast_taskprocessor_listener_callbacks test_callbacks
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener&#39;s shutdown callback.
#define TEST_DATA_ARRAY_SIZE
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
Test Framework API.
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_init(cond, attr)
Definition: lock.h:199
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
static int shutdown_has_completed(struct shutdown_data *shutdown_data)
#define ast_mutex_lock(a)
Definition: lock.h:187
static int local_task_exe(struct ast_taskprocessor_local *local)
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define NULL
Definition: resample.c:96
#define ast_cond_signal(cond)
Definition: lock.h:201
Relevant data associated with taskprocessor load test.
static void * tps_shutdown_thread(void *data)
static int task(void *data)
Queued task for baseline test.
pthread_cond_t ast_cond_t
Definition: lock.h:176
int ast_test_register(ast_test_cb_t *cb)
Definition: test.c:194
static int load_module(void)
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
static void * listener(void *unused)
Definition: asterisk.c:1476
#define HIGH_WATER_MARK
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ast_test_status_update(a, b, c...)
Definition: test.h:129
ast_cond_t cond
#define ao2_ref(o, delta)
Definition: astobj2.h:464
long int ast_random(void)
Definition: main/utils.c:2064
static void task_data_dtor(void *obj)
static struct shutdown_data * shutdown_data_create(int dont_wait)
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static void shutdown_data_dtor(void *data)
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
static struct task_data * task_data_create(void)
Create a task_data object.
int task_rand[NUM_TASKS]
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener&#39;s task_pushed callback
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
def info(msg)
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
Local data parameter.
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by sybsystem.
def ignore(key=None, val=None, section=None, pjsip=None, nmapped=None, type='endpoint')
Definition: sip_to_pjsip.py:48
userdata associated with baseline taskprocessor test
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
#define ast_free(a)
Definition: astmm.h:182
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:559
static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
#define LOW_WATER_MARK
static void * test_listener_pvt_alloc(void)
test taskprocessor listener&#39;s alloc callback
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static int unload_module(void)
An API for managing task processing threads that can be shared across modules.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:965
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
Set taskprocessor alert levels for the serializers in the pool.
Definition: serializer.c:156
struct ast_serializer_pool * ast_serializer_pool_create(const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
Create a serializer pool.
Definition: serializer.c:76
const char * ast_serializer_pool_name(const struct ast_serializer_pool *pool)
Retrieve the base name of the serializer pool.
Definition: serializer.c:122
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
Definition: test.c:65
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
An opaque threadpool structure.
Definition: threadpool.c:36
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
#define NUM_TASKS
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
#define ast_mutex_init(pmutex)
Definition: lock.h:184
#define ast_mutex_destroy(a)
Definition: lock.h:186
Private data for the test taskprocessor listener.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
static struct load_task_data load_task_results
Asterisk module definitions.
static void data_cleanup(void *data)
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
struct ast_taskprocessor * ast_serializer_pool_get(struct ast_serializer_pool *pool)
Retrieve a serializer from the pool.
Definition: serializer.c:127
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:91
Structure for mutex and tracking information.
Definition: lock.h:135
static int test_start(struct ast_taskprocessor_listener *listener)
test taskprocessor listener&#39;s start callback
ast_test_result_state
Definition: test.h:200
AST_TEST_DEFINE(default_taskprocessor)
Baseline test for default taskprocessor.
#define ast_mutex_unlock(a)
Definition: lock.h:188