Asterisk - The Open Source Telephony Project  18.5.0
taskprocessor.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007-2013, Digium, Inc.
5  *
6  * Dwayne M. Hubbard <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22  *
23  * \author Dwayne Hubbard <[email protected]>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/_private.h"
33 #include "asterisk/module.h"
34 #include "asterisk/time.h"
35 #include "asterisk/astobj2.h"
36 #include "asterisk/cli.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/sem.h"
39 
40 /*!
41  * \brief tps_task structure is queued to a taskprocessor
42  *
43  * tps_tasks are processed in FIFO order and freed by the taskprocessing
44  * thread after the task handler returns. The callback function that is assigned
45  * to the execute() function pointer is responsible for releasing datap resources if necessary.
46  */
47 struct tps_task {
48  /*! \brief The execute() task callback function pointer */
49  union {
50  int (*execute)(void *datap);
51  int (*execute_local)(struct ast_taskprocessor_local *local);
52  } callback;
53  /*! \brief The data pointer for the task execute() function */
54  void *datap;
55  /*! \brief AST_LIST_ENTRY overhead */
57  unsigned int wants_local:1;
58 };
59 
60 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
62  /*! \brief This is the maximum number of tasks queued at any one time */
63  unsigned long max_qsize;
64  /*! \brief This is the current number of tasks processed */
65  unsigned long _tasks_processed_count;
66 };
67 
68 /*! \brief A ast_taskprocessor structure is a singleton by name */
70  /*! \brief Taskprocessor statistics */
72  void *local_data;
73  /*! \brief Taskprocessor current queue size */
75  /*! \brief Taskprocessor low water clear alert level */
77  /*! \brief Taskprocessor high water alert trigger level */
79  /*! \brief Taskprocessor queue */
82  /*! Current thread executing the tasks */
83  pthread_t thread;
84  /*! Indicates if the taskprocessor is currently executing a task */
85  unsigned int executing:1;
86  /*! Indicates that a high water warning has been issued on this task processor */
87  unsigned int high_water_warned:1;
88  /*! Indicates that a high water alert is active on this taskprocessor */
89  unsigned int high_water_alert:1;
90  /*! Indicates if the taskprocessor is currently suspended */
91  unsigned int suspended:1;
92  /*! \brief Anything before the first '/' in the name (if there is one) */
93  char *subsystem;
94  /*! \brief Friendly name of the taskprocessor.
95  * Subsystem is appended after the name's NULL terminator.
96  */
97  char name[0];
98 };
99 
100 /*!
101  * \brief A listener for taskprocessors
102  *
103  * \since 12.0.0
104  *
105  * When a taskprocessor's state changes, the listener
106  * is notified of the change. This allows for tasks
107  * to be addressed in whatever way is appropriate for
108  * the module using the taskprocessor.
109  */
111  /*! The callbacks the taskprocessor calls into to notify of state changes */
113  /*! The taskprocessor that the listener is listening to */
115  /*! Data private to the listener */
116  void *user_data;
117 };
118 
119 /*!
120  * Keep track of which subsystems are in alert
121  * and how many of their taskprocessors are overloaded.
122  */
124  unsigned int alert_count;
125  char subsystem[0];
126 };
127 static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
128 
129 #ifdef LOW_MEMORY
130 #define TPS_MAX_BUCKETS 61
131 #else
132 /*! \brief Number of buckets in the tps_singletons container. */
133 #define TPS_MAX_BUCKETS 1567
134 #endif
135 
136 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
137 static struct ao2_container *tps_singletons;
138 
139 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
140 static ast_cond_t cli_ping_cond;
141 
142 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
143 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
144 
145 /*! \brief The astobj2 hash callback for taskprocessors */
146 static int tps_hash_cb(const void *obj, const int flags);
147 /*! \brief The astobj2 compare callback for taskprocessors */
148 static int tps_cmp_cb(void *obj, void *arg, int flags);
149 
150 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
151 static int tps_ping_handler(void *datap);
152 
153 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
154 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
155 static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
156 static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
157 static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
158 
160  AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
161  AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
162  AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
163  AST_CLI_DEFINE(cli_tps_reset_stats, "Reset a named task processor's stats"),
164  AST_CLI_DEFINE(cli_tps_reset_stats_all, "Reset all task processors' stats"),
165 };
166 
168  pthread_t poll_thread;
169  int dead;
170  struct ast_sem sem;
171 };
172 
174 {
175  ast_assert(pvt->dead);
176  ast_sem_destroy(&pvt->sem);
177  ast_free(pvt);
178 }
179 
181 {
182  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
183 
185 
186  listener->user_data = NULL;
187 }
188 
189 /*!
190  * \brief Function that processes tasks in the taskprocessor
191  * \internal
192  */
193 static void *default_tps_processing_function(void *data)
194 {
195  struct ast_taskprocessor_listener *listener = data;
196  struct ast_taskprocessor *tps = listener->tps;
197  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
198  int sem_value;
199  int res;
200 
201  while (!pvt->dead) {
202  res = ast_sem_wait(&pvt->sem);
203  if (res != 0 && errno != EINTR) {
204  ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
205  strerror(errno));
206  /* Just give up */
207  break;
208  }
210  }
211 
212  /* No posting to a dead taskprocessor! */
213  res = ast_sem_getvalue(&pvt->sem, &sem_value);
214  ast_assert(res == 0 && sem_value == 0);
215 
216  /* Free the shutdown reference (see default_listener_shutdown) */
217  ao2_t_ref(listener->tps, -1, "tps-shutdown");
218 
219  return NULL;
220 }
221 
223 {
224  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
225 
227  return -1;
228  }
229 
230  return 0;
231 }
232 
233 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
234 {
235  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
236 
237  if (ast_sem_post(&pvt->sem) != 0) {
238  ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
239  strerror(errno));
240  }
241 }
242 
243 static int default_listener_die(void *data)
244 {
245  struct default_taskprocessor_listener_pvt *pvt = data;
246  pvt->dead = 1;
247  return 0;
248 }
249 
251 {
252  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
253  int res;
254 
255  /* Hold a reference during shutdown */
256  ao2_t_ref(listener->tps, +1, "tps-shutdown");
257 
258  if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
259  /* This will cause the thread to exit early without completing tasks already
260  * in the queue. This is probably the least bad option in this situation. */
262  }
263 
265 
266  if (pthread_equal(pthread_self(), pvt->poll_thread)) {
267  res = pthread_detach(pvt->poll_thread);
268  if (res != 0) {
269  ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
270  }
271  } else {
272  res = pthread_join(pvt->poll_thread, NULL);
273  if (res != 0) {
274  ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
275  }
276  }
278 }
279 
282  .task_pushed = default_task_pushed,
283  .shutdown = default_listener_shutdown,
285 };
286 
287 /*!
288  * \internal
289  * \brief Clean up resources on Asterisk shutdown
290  */
291 static void tps_shutdown(void)
292 {
293  ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
294  AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
295  AST_VECTOR_RW_FREE(&overloaded_subsystems);
296  ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
297  tps_singletons = NULL;
298 }
299 
300 /* initialize the taskprocessor container and register CLI operations */
301 int ast_tps_init(void)
302 {
305  if (!tps_singletons) {
306  ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
307  return -1;
308  }
309 
310  if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
311  ao2_ref(tps_singletons, -1);
312  ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
313  return -1;
314  }
315 
316  ast_cond_init(&cli_ping_cond, NULL);
317 
318  ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
319 
321 
322  return 0;
323 }
324 
325 /* allocate resources for the task */
326 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
327 {
328  struct tps_task *t;
329  if (!task_exe) {
330  ast_log(LOG_ERROR, "task_exe is NULL!\n");
331  return NULL;
332  }
333 
334  t = ast_calloc(1, sizeof(*t));
335  if (!t) {
336  ast_log(LOG_ERROR, "failed to allocate task!\n");
337  return NULL;
338  }
339 
340  t->callback.execute = task_exe;
341  t->datap = datap;
342 
343  return t;
344 }
345 
346 static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
347 {
348  struct tps_task *t;
349  if (!task_exe) {
350  ast_log(LOG_ERROR, "task_exe is NULL!\n");
351  return NULL;
352  }
353 
354  t = ast_calloc(1, sizeof(*t));
355  if (!t) {
356  ast_log(LOG_ERROR, "failed to allocate task!\n");
357  return NULL;
358  }
359 
360  t->callback.execute_local = task_exe;
361  t->datap = datap;
362  t->wants_local = 1;
363 
364  return t;
365 }
366 
367 /* release task resources */
368 static void *tps_task_free(struct tps_task *task)
369 {
370  ast_free(task);
371  return NULL;
372 }
373 
374 /* Taskprocessor tab completion.
375  *
376  * The caller of this function is responsible for argument
377  * position checks prior to calling.
378  */
380 {
381  int tklen;
382  struct ast_taskprocessor *p;
383  struct ao2_iterator i;
384 
385  tklen = strlen(a->word);
386  i = ao2_iterator_init(tps_singletons, 0);
387  while ((p = ao2_iterator_next(&i))) {
388  if (!strncasecmp(a->word, p->name, tklen)) {
391  break;
392  }
393  }
395  }
397 
398  return NULL;
399 }
400 
401 /* ping task handling function */
402 static int tps_ping_handler(void *datap)
403 {
404  ast_mutex_lock(&cli_ping_cond_lock);
405  ast_cond_signal(&cli_ping_cond);
406  ast_mutex_unlock(&cli_ping_cond_lock);
407  return 0;
408 }
409 
410 /* ping the specified taskprocessor and display the ping time on the CLI */
411 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
412 {
413  struct timeval begin, end, delta;
414  const char *name;
415  struct timeval when;
416  struct timespec ts;
417  struct ast_taskprocessor *tps;
418 
419  switch (cmd) {
420  case CLI_INIT:
421  e->command = "core ping taskprocessor";
422  e->usage =
423  "Usage: core ping taskprocessor <taskprocessor>\n"
424  " Displays the time required for a task to be processed\n";
425  return NULL;
426  case CLI_GENERATE:
427  if (a->pos == 3) {
429  } else {
430  return NULL;
431  }
432  }
433 
434  if (a->argc != 4)
435  return CLI_SHOWUSAGE;
436 
437  name = a->argv[3];
438  if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
439  ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
440  return CLI_SUCCESS;
441  }
442  ast_cli(a->fd, "\npinging %s ...", name);
443 
444  /*
445  * Wait up to 5 seconds for a ping reply.
446  *
447  * On a very busy system it could take awhile to get a
448  * ping response from some taskprocessors.
449  */
450  begin = ast_tvnow();
451  when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
452  ts.tv_sec = when.tv_sec;
453  ts.tv_nsec = when.tv_usec * 1000;
454 
455  ast_mutex_lock(&cli_ping_cond_lock);
456  if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
457  ast_mutex_unlock(&cli_ping_cond_lock);
458  ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
460  return CLI_FAILURE;
461  }
462  ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
463  ast_mutex_unlock(&cli_ping_cond_lock);
464 
465  end = ast_tvnow();
466  delta = ast_tvsub(end, begin);
467  ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
469  return CLI_SUCCESS;
470 }
471 
472 /*!
473  * \internal
474  * \brief Taskprocessor ao2 container sort function.
475  * \since 13.8.0
476  *
477  * \param obj_left pointer to the (user-defined part) of an object.
478  * \param obj_right pointer to the (user-defined part) of an object.
479  * \param flags flags from ao2_callback()
480  * OBJ_SEARCH_OBJECT - if set, 'obj_right', is an object.
481  * OBJ_SEARCH_KEY - if set, 'obj_right', is a search key item that is not an object.
482  * OBJ_SEARCH_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
483  *
484  * \retval <0 if obj_left < obj_right
485  * \retval =0 if obj_left == obj_right
486  * \retval >0 if obj_left > obj_right
487  */
488 static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
489 {
490  const struct ast_taskprocessor *tps_left = obj_left;
491  const struct ast_taskprocessor *tps_right = obj_right;
492  const char *right_key = obj_right;
493  int cmp;
494 
495  switch (flags & OBJ_SEARCH_MASK) {
496  default:
497  case OBJ_SEARCH_OBJECT:
498  right_key = tps_right->name;
499  /* Fall through */
500  case OBJ_SEARCH_KEY:
501  cmp = strcasecmp(tps_left->name, right_key);
502  break;
504  cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
505  break;
506  }
507  return cmp;
508 }
509 
510 #define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s\n"
511 #define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu\n"
512 
513 /*!
514  * \internal
515  * \brief Print taskprocessor information to CLI.
516  * \since 13.30.0
517  *
518  * \param fd the file descriptor
519  * \param tps the taskprocessor
520  */
522 {
524  tps->tps_queue_size, tps->stats.max_qsize, tps->tps_queue_low,
525  tps->tps_queue_high);
526 }
527 
528 /*!
529  * \internal
530  * \brief Prints an optionally narrowed down list of taskprocessors to the CLI.
531  * \since 13.30.0
532  *
533  * \param fd the file descriptor
534  * \param like the string we are matching on
535  *
536  * \retval number of taskprocessors on success
537  * \retval 0 otherwise
538  */
539 static int tps_report_taskprocessor_list(int fd, const char *like)
540 {
541  int tps_count = 0;
542  int word_len;
543  struct ao2_container *sorted_tps;
544  struct ast_taskprocessor *tps;
545  struct ao2_iterator iter;
546 
548  NULL);
549  if (!sorted_tps
550  || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
551  ast_debug(1, "Failed to retrieve sorted taskprocessors\n");
552  ao2_cleanup(sorted_tps);
553  return 0;
554  }
555 
556  word_len = strlen(like);
557  iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
558  while ((tps = ao2_iterator_next(&iter))) {
559  if (like) {
560  if (!strncasecmp(like, tps->name, word_len)) {
562  tps_count++;
563  }
564  } else {
566  tps_count++;
567  }
569  }
570  ao2_iterator_destroy(&iter);
571  ao2_ref(sorted_tps, -1);
572 
573  return tps_count;
574 }
575 
576 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
577 {
578  const char *like;
579 
580  switch (cmd) {
581  case CLI_INIT:
582  e->command = "core show taskprocessors [like]";
583  e->usage =
584  "Usage: core show taskprocessors [like keyword]\n"
585  " Shows a list of instantiated task processors and their statistics\n";
586  return NULL;
587  case CLI_GENERATE:
588  if (a->pos == e->args) {
590  } else {
591  return NULL;
592  }
593  }
594 
595  if (a->argc == e->args - 1) {
596  like = "";
597  } else if (a->argc == e->args + 1 && !strcasecmp(a->argv[e->args-1], "like")) {
598  like = a->argv[e->args];
599  } else {
600  return CLI_SHOWUSAGE;
601  }
602 
603  ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
604  ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
605 
606  return CLI_SUCCESS;
607 }
608 
609 /* hash callback for astobj2 */
610 static int tps_hash_cb(const void *obj, const int flags)
611 {
612  const struct ast_taskprocessor *tps = obj;
613  const char *name = flags & OBJ_KEY ? obj : tps->name;
614 
615  return ast_str_case_hash(name);
616 }
617 
618 /* compare callback for astobj2 */
619 static int tps_cmp_cb(void *obj, void *arg, int flags)
620 {
621  struct ast_taskprocessor *lhs = obj, *rhs = arg;
622  const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
623 
624  return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
625 }
626 
627 static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
628 {
629  return !strcmp(alert->subsystem, subsystem);
630 }
631 
632 static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
633 {
634  return strcmp(a->subsystem, b->subsystem);
635 }
636 
638 {
639  struct subsystem_alert *alert;
640  unsigned int count = 0;
641  int idx;
642 
643  AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
644  idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
645  if (idx >= 0) {
646  alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
647  count = alert->alert_count;
648  }
649  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
650 
651  return count;
652 }
653 
654 static void subsystem_alert_increment(const char *subsystem)
655 {
656  struct subsystem_alert *alert;
657  int idx;
658 
659  if (ast_strlen_zero(subsystem)) {
660  return;
661  }
662 
663  AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
664  idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
665  if (idx >= 0) {
666  alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
667  alert->alert_count++;
668  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
669  return;
670  }
671 
672  alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
673  if (!alert) {
674  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
675  return;
676  }
677  alert->alert_count = 1;
678  strcpy(alert->subsystem, subsystem); /* Safe */
679 
680  if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
681  ast_free(alert);
682  }
683  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
684 }
685 
686 static void subsystem_alert_decrement(const char *subsystem)
687 {
688  struct subsystem_alert *alert;
689  int idx;
690 
691  if (ast_strlen_zero(subsystem)) {
692  return;
693  }
694 
695  AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
696  idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
697  if (idx < 0) {
699  "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
700  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
701  return;
702  }
703  alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
704 
705  alert->alert_count--;
706  if (alert->alert_count <= 0) {
707  AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
708  ast_free(alert);
709  }
710 
711  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
712 }
713 
714 static void subsystem_copy(struct subsystem_alert *alert,
715  struct subsystem_alert_vector *vector)
716 {
717  struct subsystem_alert *alert_copy;
718  alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
719  if (!alert_copy) {
720  return;
721  }
722  alert_copy->alert_count = alert->alert_count;
723  strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
724  if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
725  ast_free(alert_copy);
726  }
727 }
728 
729 static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
730 {
731  struct subsystem_alert_vector sorted_subsystems;
732  int i;
733 
734 #define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
735 #define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
736 
737  switch (cmd) {
738  case CLI_INIT:
739  e->command = "core show taskprocessor alerted subsystems";
740  e->usage =
741  "Usage: core show taskprocessor alerted subsystems\n"
742  " Shows a list of task processor subsystems that are currently alerted\n";
743  return NULL;
744  case CLI_GENERATE:
745  return NULL;
746  }
747 
748  if (a->argc != e->args) {
749  return CLI_SHOWUSAGE;
750  }
751 
752  if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
753  return CLI_FAILURE;
754  }
755 
756  AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
757  for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
758  subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
759  }
760  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
761 
762  ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
763 
764  for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
765  struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
766  ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
767  }
768 
769  ast_cli(a->fd, "\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
770 
771  AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
772  AST_VECTOR_FREE(&sorted_subsystems);
773 
774  return CLI_SUCCESS;
775 }
776 
777 
778 /*! Count of the number of taskprocessors in high water alert. */
779 static unsigned int tps_alert_count;
780 
781 /*! Access protection for tps_alert_count */
783 
784 /*!
785  * \internal
786  * \brief Add a delta to tps_alert_count with protection.
787  * \since 13.10.0
788  *
789  * \param tps Taskprocessor updating queue water mark alert trigger.
790  * \param delta The amount to add to tps_alert_count.
791  *
792  * \return Nothing
793  */
794 static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
795 {
796  unsigned int old;
797 
799  old = tps_alert_count;
800  tps_alert_count += delta;
801  if (DEBUG_ATLEAST(3)
802  /* and tps_alert_count becomes zero or non-zero */
803  && !old != !tps_alert_count) {
804  ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
805  tps->name, tps_alert_count ? "triggered" : "cleared");
806  }
807 
808  if (tps->subsystem[0] != '\0') {
809  if (delta > 0) {
811  } else {
813  }
814  }
815 
817 }
818 
819 unsigned int ast_taskprocessor_alert_get(void)
820 {
821  unsigned int count;
822 
824  count = tps_alert_count;
826 
827  return count;
828 }
829 
830 int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
831 {
832  if (!tps || high_water < 0 || high_water < low_water) {
833  return -1;
834  }
835 
836  if (low_water < 0) {
837  /* Set low water level to 90% of high water level */
838  low_water = (high_water * 9) / 10;
839  }
840 
841  ao2_lock(tps);
842 
843  tps->tps_queue_low = low_water;
844  tps->tps_queue_high = high_water;
845 
846  if (tps->high_water_alert) {
847  if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
848  /* Update water mark alert immediately */
849  tps->high_water_alert = 0;
850  tps_alert_add(tps, -1);
851  }
852  } else {
853  if (high_water < tps->tps_queue_size) {
854  /* Update water mark alert immediately */
855  tps->high_water_alert = 1;
856  tps_alert_add(tps, +1);
857  }
858  }
859 
860  ao2_unlock(tps);
861 
862  return 0;
863 }
864 
865 /* destroy the taskprocessor */
866 static void tps_taskprocessor_dtor(void *tps)
867 {
868  struct ast_taskprocessor *t = tps;
869  struct tps_task *task;
870 
871  while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
872  tps_task_free(task);
873  }
874  t->tps_queue_size = 0;
875 
876  if (t->high_water_alert) {
877  t->high_water_alert = 0;
878  tps_alert_add(t, -1);
879  }
880 
881  ao2_cleanup(t->listener);
882  t->listener = NULL;
883 }
884 
885 /* pop the front task and return it */
887 {
888  struct tps_task *task;
889 
890  if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
891  --tps->tps_queue_size;
892  if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
893  tps->high_water_alert = 0;
894  tps_alert_add(tps, -1);
895  }
896  }
897  return task;
898 }
899 
901 {
902  return (tps) ? tps->tps_queue_size : -1;
903 }
904 
905 /* taskprocessor name accessor */
907 {
908  if (!tps) {
909  ast_log(LOG_ERROR, "no taskprocessor specified!\n");
910  return NULL;
911  }
912  return tps->name;
913 }
914 
916 {
917  listener->callbacks->shutdown(listener);
918  ao2_ref(listener->tps, -1);
919 }
920 
921 static void taskprocessor_listener_dtor(void *obj)
922 {
923  struct ast_taskprocessor_listener *listener = obj;
924 
925  if (listener->callbacks->dtor) {
926  listener->callbacks->dtor(listener);
927  }
928 }
929 
931 {
933 
934  listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
935  if (!listener) {
936  return NULL;
937  }
938  listener->callbacks = callbacks;
939  listener->user_data = user_data;
940 
941  return listener;
942 }
943 
945 {
946  ao2_ref(listener->tps, +1);
947  return listener->tps;
948 }
949 
951 {
952  return listener->user_data;
953 }
954 
955 static void *default_listener_pvt_alloc(void)
956 {
958 
959  pvt = ast_calloc(1, sizeof(*pvt));
960  if (!pvt) {
961  return NULL;
962  }
964  if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
965  ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
966  ast_free(pvt);
967  return NULL;
968  }
969  return pvt;
970 }
971 
972 /*!
973  * \internal
974  * \brief Allocate a task processor structure
975  *
976  * \param name Name of the task processor.
977  * \param listener Listener to associate with the task processor.
978  *
979  * \return The newly allocated task processor.
980  *
981  * \pre tps_singletons must be locked by the caller.
982  */
984 {
985  struct ast_taskprocessor *p;
986  char *subsystem_separator;
987  size_t subsystem_length = 0;
988  size_t name_length;
989 
990  name_length = strlen(name);
991  subsystem_separator = strchr(name, '/');
992  if (subsystem_separator) {
993  subsystem_length = subsystem_separator - name;
994  }
995 
996  p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
997  if (!p) {
998  ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
999  return NULL;
1000  }
1001 
1002  /* Set default congestion water level alert triggers. */
1005 
1006  strcpy(p->name, name); /* Safe */
1007  p->subsystem = p->name + name_length + 1;
1008  ast_copy_string(p->subsystem, name, subsystem_length + 1);
1009 
1010  ao2_ref(listener, +1);
1011  p->listener = listener;
1012 
1014 
1015  ao2_ref(p, +1);
1016  listener->tps = p;
1017 
1018  if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
1019  ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
1020  listener->tps = NULL;
1021  ao2_ref(p, -2);
1022  return NULL;
1023  }
1024 
1025  return p;
1026 }
1027 
1029 {
1030  if (p && p->listener->callbacks->start(p->listener)) {
1031  ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
1032  p->name);
1034 
1035  return NULL;
1036  }
1037 
1038  return p;
1039 }
1040 
1041 /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
1042  * create the taskprocessor if we were told via ast_tps_options to return a reference only
1043  * if it already exists */
1045 {
1046  struct ast_taskprocessor *p;
1049 
1050  if (ast_strlen_zero(name)) {
1051  ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
1052  return NULL;
1053  }
1054  ao2_lock(tps_singletons);
1055  p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1056  if (p || (create & TPS_REF_IF_EXISTS)) {
1057  /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1058  ao2_unlock(tps_singletons);
1059  return p;
1060  }
1061 
1062  /* Create a new taskprocessor. Start by creating a default listener */
1064  if (!pvt) {
1065  ao2_unlock(tps_singletons);
1066  return NULL;
1067  }
1068  listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
1069  if (!listener) {
1070  ao2_unlock(tps_singletons);
1072  return NULL;
1073  }
1074 
1075  p = __allocate_taskprocessor(name, listener);
1076  ao2_unlock(tps_singletons);
1077  p = __start_taskprocessor(p);
1078  ao2_ref(listener, -1);
1079 
1080  return p;
1081 }
1082 
1084 {
1085  struct ast_taskprocessor *p;
1086 
1087  ao2_lock(tps_singletons);
1088  p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1089  if (p) {
1090  ao2_unlock(tps_singletons);
1092  return NULL;
1093  }
1094 
1095  p = __allocate_taskprocessor(name, listener);
1096  ao2_unlock(tps_singletons);
1097 
1098  return __start_taskprocessor(p);
1099 }
1100 
1102  void *local_data)
1103 {
1104  SCOPED_AO2LOCK(lock, tps);
1105  tps->local_data = local_data;
1106 }
1107 
1108 /* decrement the taskprocessor reference count and unlink from the container if necessary */
1110 {
1111  if (!tps) {
1112  return NULL;
1113  }
1114 
1115  /* To prevent another thread from finding and getting a reference to this
1116  * taskprocessor we hold the singletons lock. If we didn't do this then
1117  * they may acquire it and find that the listener has been shut down.
1118  */
1119  ao2_lock(tps_singletons);
1120 
1121  if (ao2_ref(tps, -1) > 3) {
1122  ao2_unlock(tps_singletons);
1123  return NULL;
1124  }
1125 
1126  /* If we're down to 3 references, then those must be:
1127  * 1. The reference we just got rid of
1128  * 2. The container
1129  * 3. The listener
1130  */
1131  ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1132  ao2_unlock(tps_singletons);
1133 
1135  return NULL;
1136 }
1137 
1138 /* push the task into the taskprocessor queue */
1139 static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
1140 {
1141  int previous_size;
1142  int was_empty;
1143 
1144  if (!tps) {
1145  ast_log(LOG_ERROR, "tps is NULL!\n");
1146  return -1;
1147  }
1148 
1149  if (!t) {
1150  ast_log(LOG_ERROR, "t is NULL!\n");
1151  return -1;
1152  }
1153 
1154  ao2_lock(tps);
1155  AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
1156  previous_size = tps->tps_queue_size++;
1157 
1158  if (tps->tps_queue_high <= tps->tps_queue_size) {
1159  if (!tps->high_water_alert) {
1160  ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
1161  tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
1162  tps->high_water_warned = 1;
1163  tps->high_water_alert = 1;
1164  tps_alert_add(tps, +1);
1165  }
1166  }
1167 
1168  /* The currently executing task counts as still in queue */
1169  was_empty = tps->executing ? 0 : previous_size == 0;
1170  ao2_unlock(tps);
1171  tps->listener->callbacks->task_pushed(tps->listener, was_empty);
1172  return 0;
1173 }
1174 
1175 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
1176 {
1177  return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
1178 }
1179 
1180 int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
1181 {
1182  return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
1183 }
1184 
1186 {
1187  if (tps) {
1188  ao2_lock(tps);
1189  tps->suspended = 1;
1190  ao2_unlock(tps);
1191  return 0;
1192  }
1193  return -1;
1194 }
1195 
1197 {
1198  if (tps) {
1199  ao2_lock(tps);
1200  tps->suspended = 0;
1201  ao2_unlock(tps);
1202  return 0;
1203  }
1204  return -1;
1205 }
1206 
1208 {
1209  return tps ? tps->suspended : -1;
1210 }
1211 
1213 {
1214  struct ast_taskprocessor_local local;
1215  struct tps_task *t;
1216  long size;
1217 
1218  ao2_lock(tps);
1219  t = tps_taskprocessor_pop(tps);
1220  if (!t) {
1221  ao2_unlock(tps);
1222  return 0;
1223  }
1224 
1225  tps->thread = pthread_self();
1226  tps->executing = 1;
1227 
1228  if (t->wants_local) {
1229  local.local_data = tps->local_data;
1230  local.data = t->datap;
1231  }
1232  ao2_unlock(tps);
1233 
1234  if (t->wants_local) {
1235  t->callback.execute_local(&local);
1236  } else {
1237  t->callback.execute(t->datap);
1238  }
1239  tps_task_free(t);
1240 
1241  ao2_lock(tps);
1242  tps->thread = AST_PTHREADT_NULL;
1243  /* We need to check size in the same critical section where we reset the
1244  * executing bit. Avoids a race condition where a task is pushed right
1245  * after we pop an empty stack.
1246  */
1247  tps->executing = 0;
1248  size = ast_taskprocessor_size(tps);
1249 
1250  /* Update the stats */
1252 
1253  /* Include the task we just executed as part of the queue size. */
1254  if (size >= tps->stats.max_qsize) {
1255  tps->stats.max_qsize = size + 1;
1256  }
1257  ao2_unlock(tps);
1258 
1259  /* If we executed a task, check for the transition to empty */
1260  if (size == 0 && tps->listener->callbacks->emptied) {
1261  tps->listener->callbacks->emptied(tps->listener);
1262  }
1263  return size > 0;
1264 }
1265 
1267 {
1268  int is_task;
1269 
1270  ao2_lock(tps);
1271  is_task = pthread_equal(tps->thread, pthread_self());
1272  ao2_unlock(tps);
1273  return is_task;
1274 }
1275 
1276 unsigned int ast_taskprocessor_seq_num(void)
1277 {
1278  static int seq_num;
1279 
1280  return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1281 }
1282 
1283 #define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
1284 
1285 void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
1286 {
1287  int final_size = strlen(name) + SEQ_STR_SIZE;
1288 
1289  ast_assert(buf != NULL && name != NULL);
1290  ast_assert(final_size <= size);
1291 
1292  snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1293 }
1294 
1295 void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
1296 {
1297  va_list ap;
1298  int user_size;
1299 
1300  ast_assert(buf != NULL);
1301  ast_assert(SEQ_STR_SIZE <= size);
1302 
1303  va_start(ap, format);
1304  user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1305  va_end(ap);
1306  if (user_size < 0) {
1307  /*
1308  * Wow! We got an output error to a memory buffer.
1309  * Assume no user part of name written.
1310  */
1311  user_size = 0;
1312  } else if (size < user_size + SEQ_STR_SIZE) {
1313  /* Truncate user part of name to make sequence number fit. */
1314  user_size = size - SEQ_STR_SIZE;
1315  }
1316 
1317  /* Append sequence number to end of user name. */
1318  snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1319 }
1320 
1321 static void tps_reset_stats(struct ast_taskprocessor *tps)
1322 {
1323  ao2_lock(tps);
1324  tps->stats._tasks_processed_count = 0;
1325  tps->stats.max_qsize = 0;
1326  ao2_unlock(tps);
1327 }
1328 
1329 static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
1330 {
1331  const char *name;
1332  struct ast_taskprocessor *tps;
1333 
1334  switch (cmd) {
1335  case CLI_INIT:
1336  e->command = "core reset taskprocessor";
1337  e->usage =
1338  "Usage: core reset taskprocessor <taskprocessor>\n"
1339  " Resets stats for the specified taskprocessor\n";
1340  return NULL;
1341  case CLI_GENERATE:
1343  }
1344 
1345  if (a->argc != 4) {
1346  return CLI_SHOWUSAGE;
1347  }
1348 
1349  name = a->argv[3];
1350  if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
1351  ast_cli(a->fd, "\nReset failed: %s not found\n\n", name);
1352  return CLI_SUCCESS;
1353  }
1354  ast_cli(a->fd, "\nResetting %s\n\n", name);
1355 
1356  tps_reset_stats(tps);
1357 
1359 
1360  return CLI_SUCCESS;
1361 }
1362 
1363 static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
1364 {
1365  struct ast_taskprocessor *tps;
1366  struct ao2_iterator iter;
1367 
1368  switch (cmd) {
1369  case CLI_INIT:
1370  e->command = "core reset taskprocessors";
1371  e->usage =
1372  "Usage: core reset taskprocessors\n"
1373  " Resets stats for all taskprocessors\n";
1374  return NULL;
1375  case CLI_GENERATE:
1376  return NULL;
1377  }
1378 
1379  if (a->argc != e->args) {
1380  return CLI_SHOWUSAGE;
1381  }
1382 
1383  ast_cli(a->fd, "\nResetting stats for all taskprocessors\n\n");
1384 
1385  iter = ao2_iterator_init(tps_singletons, 0);
1386  while ((tps = ao2_iterator_next(&iter))) {
1387  tps_reset_stats(tps);
1389  }
1390  ao2_iterator_destroy(&iter);
1391 
1392  return CLI_SUCCESS;
1393 }
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:463
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
A listener for taskprocessors.
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition: vector.h:721
#define ast_rwlock_rdlock(a)
Definition: lock.h:233
#define AST_CLI_DEFINE(fn, txt,...)
Definition: cli.h:197
const struct ast_taskprocessor_listener_callbacks * callbacks
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition: vector.h:371
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.
Asterisk main include file. File version handling, generic pbx functions.
void(* shutdown)(struct ast_taskprocessor_listener *listener)
Indicates the taskprocessor wishes to die.
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
Definition: lock.h:541
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
static int tps_report_taskprocessor_list(int fd, const char *like)
static void tps_shutdown(void)
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static char * cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
static void subsystem_alert_increment(const char *subsystem)
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
union tps_task::@428 callback
The execute() task callback function pointer.
Time-related functions and macros.
#define OBJ_KEY
Definition: astobj2.h:1155
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
static char * cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.
descriptor for a cli entry.
Definition: cli.h:171
const int argc
Definition: cli.h:160
#define LOG_WARNING
Definition: logger.h:274
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:63
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
struct ast_taskprocessor * tps
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
unsigned int high_water_alert
Definition: taskprocessor.c:89
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
unsigned int suspended
Definition: taskprocessor.c:91
Definition: cli.h:152
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:880
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_assert(a)
Definition: utils.h:695
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define ast_mutex_lock(a)
Definition: lock.h:187
Taskprocessor queue.
Definition: taskprocessor.c:80
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *)
CLI taskprocessor ping <blah>operation requires a ping condition lock.
unsigned int high_water_warned
Definition: taskprocessor.c:87
#define NULL
Definition: resample.c:96
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define LOG_DEBUG
Definition: logger.h:241
#define ast_rwlock_unlock(a)
Definition: lock.h:232
#define FMT_HEADERS_SUBSYSTEM
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
#define ast_cond_signal(cond)
Definition: lock.h:201
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
#define FMT_HEADERS
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
static int task(void *data)
Queued task for baseline test.
int args
This gets set in ast_cli_register()
Definition: cli.h:185
pthread_cond_t ast_cond_t
Definition: lock.h:176
#define ast_strlen_zero(foo)
Definition: strings.h:52
void(* dtor)(struct ast_taskprocessor_listener *listener)
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
Asterisk semaphore API.
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:54
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ast_log
Definition: astobj2.c:42
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:65
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
static ast_rwlock_t tps_alert_lock
static void * listener(void *unused)
Definition: asterisk.c:1476
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define TPS_MAX_BUCKETS
unsigned int alert_count
int(* execute_local)(struct ast_taskprocessor_local *local)
Definition: taskprocessor.c:51
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
char * subsystem
Anything before the first &#39;/&#39; in the name (if there is one)
Definition: taskprocessor.c:93
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
Am I the given taskprocessor&#39;s current task.
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
const int fd
Definition: cli.h:159
#define AST_PTHREADT_NULL
Definition: lock.h:66
ast_mutex_t lock
Definition: app_meetme.c:1091
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static int default_listener_die(void *data)
#define ao2_lock(a)
Definition: astobj2.h:718
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition: time.h:238
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
static int default_listener_start(struct ast_taskprocessor_listener *listener)
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:832
AST_LIST_HEAD_NOLOCK(contactliststruct, contact)
static void tps_reset_stats(struct ast_taskprocessor *tps)
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
unsigned int executing
Definition: taskprocessor.c:85
const char *const * argv
Definition: cli.h:161
#define LOG_ERROR
Definition: logger.h:285
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:730
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
static void subsystem_alert_decrement(const char *subsystem)
ast_tps_options
ast_tps_options for specification of taskprocessor options
Definition: taskprocessor.h:73
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define CLI_SHOWUSAGE
Definition: cli.h:45
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
int errno
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2283
Local data parameter.
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static unsigned int tps_alert_count
#define AST_LIST_ENTRY(type)
Declare a forward link structure inside a list entry.
Definition: linkedlists.h:409
#define CLI_FAILURE
Definition: cli.h:46
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
char * command
Definition: cli.h:186
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:890
int(* execute)(void *datap)
Definition: taskprocessor.c:50
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by sybsystem.
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:559
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)
static char * cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define FMT_FIELDS
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.
int ast_tps_init(void)
const char * word
Definition: cli.h:163
Prototypes for public functions only of internal interest,.
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
#define FMT_FIELDS_SUBSYSTEM
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener&#39;s taskprocessor.
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
struct ast_taskprocessor::tps_queue tps_queue
tps_taskprocessor_stats maintain statistics for a taskprocessor.
Definition: taskprocessor.c:61
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
const char * usage
Definition: cli.h:177
struct tps_task::@429 list
AST_LIST_ENTRY overhead.
#define ast_rwlock_wrlock(a)
Definition: lock.h:234
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:77
#define CLI_SUCCESS
Definition: cli.h:44
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
Get the task processor suspend status.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
Definition: sem.h:81
static struct ast_cli_entry taskprocessor_clis[]
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Standard Command Line Interface.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401
#define SEQ_STR_SIZE
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
const int pos
Definition: cli.h:164
static int tps_ping_handler(void *datap)
CLI taskprocessor ping <blah>handler function.
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ao2_unlink_flags(container, obj, flags)
Definition: astobj2.h:1622
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
unsigned int wants_local
Definition: taskprocessor.c:57
static void tps_taskprocessor_dtor(void *tps)
static struct test_val b
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:98
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1358
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: extconf.c:2298
Generic container type.
static void * default_listener_pvt_alloc(void)
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
Search option field mask.
Definition: astobj2.h:1076
static void taskprocessor_listener_dtor(void *obj)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2726
#define DEBUG_ATLEAST(level)
Definition: logger.h:441
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
Definition: vector.h:412
Asterisk module definitions.
static snd_pcm_format_t format
Definition: chan_alsa.c:102
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:63
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
static void * tps_task_free(struct tps_task *task)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define AST_MUTEX_DEFINE_STATIC(mutex)
Definition: lock.h:518
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:91
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1250
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:865
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ast_mutex_unlock(a)
Definition: lock.h:188
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
Definition: taskprocessor.c:71
static struct test_val a
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.