Asterisk - The Open Source Telephony Project  18.5.0
Data Structures | Macros | Functions | Variables
taskprocessor.c File Reference

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules. More...

#include "asterisk.h"
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sem.h"
Include dependency graph for taskprocessor.c:

Go to the source code of this file.

Data Structures

struct  ast_taskprocessor
 A ast_taskprocessor structure is a singleton by name. More...
 
struct  ast_taskprocessor_listener
 A listener for taskprocessors. More...
 
struct  default_taskprocessor_listener_pvt
 
struct  subsystem_alert
 
struct  ast_taskprocessor::tps_queue
 Taskprocessor queue. More...
 
struct  tps_task
 tps_task structure is queued to a taskprocessor More...
 
struct  tps_taskprocessor_stats
 tps_taskprocessor_stats maintain statistics for a taskprocessor. More...
 

Macros

#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu\n"
 
#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"
 
#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s\n"
 
#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"
 
#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
 
#define TPS_MAX_BUCKETS   1567
 

Functions

static struct ast_taskprocessor__allocate_taskprocessor (const char *name, struct ast_taskprocessor_listener *listener)
 
static struct ast_taskprocessor__start_taskprocessor (struct ast_taskprocessor *p)
 
unsigned int ast_taskprocessor_alert_get (void)
 Get the current taskprocessor high water alert count. More...
 
int ast_taskprocessor_alert_set_levels (struct ast_taskprocessor *tps, long low_water, long high_water)
 Set the high and low alert water marks of the given taskprocessor queue. More...
 
void ast_taskprocessor_build_name (char *buf, unsigned int size, const char *format,...)
 Build a taskprocessor name with a sequence number on the end. More...
 
struct ast_taskprocessorast_taskprocessor_create_with_listener (const char *name, struct ast_taskprocessor_listener *listener)
 Create a taskprocessor with a custom listener. More...
 
int ast_taskprocessor_execute (struct ast_taskprocessor *tps)
 Pop a task off the taskprocessor and execute it. More...
 
struct ast_taskprocessorast_taskprocessor_get (const char *name, enum ast_tps_options create)
 Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary. More...
 
unsigned int ast_taskprocessor_get_subsystem_alert (const char *subsystem)
 Get the current taskprocessor high water alert count by sybsystem. More...
 
int ast_taskprocessor_is_suspended (struct ast_taskprocessor *tps)
 Get the task processor suspend status. More...
 
int ast_taskprocessor_is_task (struct ast_taskprocessor *tps)
 Am I the given taskprocessor's current task. More...
 
struct ast_taskprocessor_listenerast_taskprocessor_listener_alloc (const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 Allocate a taskprocessor listener. More...
 
struct ast_taskprocessorast_taskprocessor_listener_get_tps (const struct ast_taskprocessor_listener *listener)
 Get a reference to the listener's taskprocessor. More...
 
void * ast_taskprocessor_listener_get_user_data (const struct ast_taskprocessor_listener *listener)
 Get the user data from the listener. More...
 
const char * ast_taskprocessor_name (struct ast_taskprocessor *tps)
 Return the name of the taskprocessor singleton. More...
 
void ast_taskprocessor_name_append (char *buf, unsigned int size, const char *name)
 Append the next sequence number to the given string, and copy into the buffer. More...
 
int ast_taskprocessor_push (struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
 Push a task into the specified taskprocessor queue and signal the taskprocessor thread. More...
 
int ast_taskprocessor_push_local (struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
 
unsigned int ast_taskprocessor_seq_num (void)
 Get the next sequence number to create a human friendly taskprocessor name. More...
 
void ast_taskprocessor_set_local (struct ast_taskprocessor *tps, void *local_data)
 Sets the local data associated with a taskprocessor. More...
 
long ast_taskprocessor_size (struct ast_taskprocessor *tps)
 Return the current size of the taskprocessor queue. More...
 
int ast_taskprocessor_suspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is suspended. More...
 
void * ast_taskprocessor_unreference (struct ast_taskprocessor *tps)
 Unreference the specified taskprocessor and its reference count will decrement. More...
 
int ast_taskprocessor_unsuspend (struct ast_taskprocessor *tps)
 Indicate the taskprocessor is unsuspended. More...
 
int ast_tps_init (void)
 
static AST_VECTOR_RW (subsystem_alert_vector, struct subsystem_alert *)
 CLI taskprocessor ping <blah>operation requires a ping condition lock. More...
 
static char * cli_subsystem_alert_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_ping (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_report (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * cli_tps_reset_stats_all (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static int default_listener_die (void *data)
 
static void * default_listener_pvt_alloc (void)
 
static void default_listener_pvt_destroy (struct default_taskprocessor_listener_pvt *pvt)
 
static void default_listener_pvt_dtor (struct ast_taskprocessor_listener *listener)
 
static void default_listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static int default_listener_start (struct ast_taskprocessor_listener *listener)
 
static void default_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 
static void * default_tps_processing_function (void *data)
 Function that processes tasks in the taskprocessor. More...
 
static void listener_shutdown (struct ast_taskprocessor_listener *listener)
 
static void subsystem_alert_decrement (const char *subsystem)
 
static void subsystem_alert_increment (const char *subsystem)
 
static int subsystem_cmp (struct subsystem_alert *a, struct subsystem_alert *b)
 
static void subsystem_copy (struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
 
static int subsystem_match (struct subsystem_alert *alert, const char *subsystem)
 
static void taskprocessor_listener_dtor (void *obj)
 
static int taskprocessor_push (struct ast_taskprocessor *tps, struct tps_task *t)
 
static void tps_alert_add (struct ast_taskprocessor *tps, int delta)
 
static int tps_cmp_cb (void *obj, void *arg, int flags)
 The astobj2 compare callback for taskprocessors. More...
 
static int tps_hash_cb (const void *obj, const int flags)
 The astobj2 hash callback for taskprocessors. More...
 
static int tps_ping_handler (void *datap)
 CLI taskprocessor ping <blah>handler function. More...
 
static int tps_report_taskprocessor_list (int fd, const char *like)
 
static void tps_report_taskprocessor_list_helper (int fd, struct ast_taskprocessor *tps)
 
static void tps_reset_stats (struct ast_taskprocessor *tps)
 
static void tps_shutdown (void)
 
static int tps_sort_cb (const void *obj_left, const void *obj_right, int flags)
 
static struct tps_tasktps_task_alloc (int(*task_exe)(void *datap), void *datap)
 
static struct tps_tasktps_task_alloc_local (int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)
 
static void * tps_task_free (struct tps_task *task)
 
static void tps_taskprocessor_dtor (void *tps)
 
static struct tps_tasktps_taskprocessor_pop (struct ast_taskprocessor *tps)
 
static char * tps_taskprocessor_tab_complete (struct ast_cli_args *a)
 

Variables

static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
 
static struct ast_cli_entry taskprocessor_clis []
 
static unsigned int tps_alert_count
 
static ast_rwlock_t tps_alert_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
 

Detailed Description

Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.

Author
Dwayne Hubbard dhubb.nosp@m.ard@.nosp@m.digiu.nosp@m.m.co.nosp@m.m

Definition in file taskprocessor.c.

Macro Definition Documentation

◆ FMT_FIELDS

#define FMT_FIELDS   "%-70s %10lu %10lu %10lu %10lu %10lu\n"

Definition at line 511 of file taskprocessor.c.

Referenced by tps_report_taskprocessor_list_helper().

◆ FMT_FIELDS_SUBSYSTEM

#define FMT_FIELDS_SUBSYSTEM   "%-32s %12u\n"

◆ FMT_HEADERS

#define FMT_HEADERS   "%-70s %10s %10s %10s %10s %10s\n"

Definition at line 510 of file taskprocessor.c.

Referenced by cli_tps_report().

◆ FMT_HEADERS_SUBSYSTEM

#define FMT_HEADERS_SUBSYSTEM   "%-32s %12s\n"

◆ SEQ_STR_SIZE

#define SEQ_STR_SIZE   (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */

◆ TPS_MAX_BUCKETS

#define TPS_MAX_BUCKETS   1567

Referenced by ast_tps_init().

Function Documentation

◆ __allocate_taskprocessor()

static struct ast_taskprocessor* __allocate_taskprocessor ( const char *  name,
struct ast_taskprocessor_listener listener 
)
static

Definition at line 983 of file taskprocessor.c.

References ao2_alloc, ao2_link_flags, ao2_ref, ast_copy_string(), ast_log, AST_PTHREADT_NULL, AST_TASKPROCESSOR_HIGH_WATER_LEVEL, ast_taskprocessor::listener, listener(), LOG_ERROR, LOG_WARNING, name, ast_taskprocessor::name, NULL, OBJ_NOLOCK, ast_taskprocessor::subsystem, ast_taskprocessor::thread, ast_taskprocessor_listener::tps, ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and tps_taskprocessor_dtor().

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

984 {
985  struct ast_taskprocessor *p;
986  char *subsystem_separator;
987  size_t subsystem_length = 0;
988  size_t name_length;
989 
990  name_length = strlen(name);
991  subsystem_separator = strchr(name, '/');
992  if (subsystem_separator) {
993  subsystem_length = subsystem_separator - name;
994  }
995 
996  p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
997  if (!p) {
998  ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
999  return NULL;
1000  }
1001 
1002  /* Set default congestion water level alert triggers. */
1005 
1006  strcpy(p->name, name); /* Safe */
1007  p->subsystem = p->name + name_length + 1;
1008  ast_copy_string(p->subsystem, name, subsystem_length + 1);
1009 
1010  ao2_ref(listener, +1);
1011  p->listener = listener;
1012 
1014 
1015  ao2_ref(p, +1);
1016  listener->tps = p;
1017 
1018  if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
1019  ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
1020  listener->tps = NULL;
1021  ao2_ref(p, -2);
1022  return NULL;
1023  }
1024 
1025  return p;
1026 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
#define LOG_WARNING
Definition: logger.h:274
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:63
struct ast_taskprocessor * tps
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define NULL
Definition: resample.c:96
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
#define ast_log
Definition: astobj2.c:42
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
static void * listener(void *unused)
Definition: asterisk.c:1476
char * subsystem
Anything before the first &#39;/&#39; in the name (if there is one)
Definition: taskprocessor.c:93
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define LOG_ERROR
Definition: logger.h:285
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static const char name[]
Definition: cdr_mysql.c:74
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401
static void tps_taskprocessor_dtor(void *tps)

◆ __start_taskprocessor()

static struct ast_taskprocessor* __start_taskprocessor ( struct ast_taskprocessor p)
static

Definition at line 1028 of file taskprocessor.c.

References ast_log, ast_taskprocessor_unreference(), ast_taskprocessor_listener::callbacks, ast_taskprocessor::listener, LOG_ERROR, ast_taskprocessor::name, NULL, and ast_taskprocessor_listener_callbacks::start.

Referenced by ast_taskprocessor_create_with_listener(), and ast_taskprocessor_get().

1029 {
1030  if (p && p->listener->callbacks->start(p->listener)) {
1031  ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
1032  p->name);
1034 
1035  return NULL;
1036  }
1037 
1038  return p;
1039 }
const struct ast_taskprocessor_listener_callbacks * callbacks
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
#define LOG_ERROR
Definition: logger.h:285
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:91

◆ ast_taskprocessor_alert_get()

unsigned int ast_taskprocessor_alert_get ( void  )

Get the current taskprocessor high water alert count.

Since
13.10.0
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 819 of file taskprocessor.c.

References ast_rwlock_rdlock, ast_rwlock_unlock, tps_alert_count, and tps_alert_lock.

Referenced by AST_TEST_DEFINE(), and distributor().

820 {
821  unsigned int count;
822 
824  count = tps_alert_count;
826 
827  return count;
828 }
#define ast_rwlock_rdlock(a)
Definition: lock.h:233
#define ast_rwlock_unlock(a)
Definition: lock.h:232
static ast_rwlock_t tps_alert_lock
static unsigned int tps_alert_count

◆ ast_taskprocessor_alert_set_levels()

int ast_taskprocessor_alert_set_levels ( struct ast_taskprocessor tps,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the given taskprocessor queue.

Since
13.10.0
Parameters
tpsTaskprocessor to update queue water marks.
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 830 of file taskprocessor.c.

References ao2_lock, ao2_unlock, ast_taskprocessor::high_water_alert, tps_alert_add(), ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by actual_load_config(), ast_res_pjsip_init_options_handling(), ast_serializer_pool_set_alerts(), ast_sorcery_object_set_congestion_levels(), AST_TEST_DEFINE(), and stasis_subscription_set_congestion_limits().

831 {
832  if (!tps || high_water < 0 || high_water < low_water) {
833  return -1;
834  }
835 
836  if (low_water < 0) {
837  /* Set low water level to 90% of high water level */
838  low_water = (high_water * 9) / 10;
839  }
840 
841  ao2_lock(tps);
842 
843  tps->tps_queue_low = low_water;
844  tps->tps_queue_high = high_water;
845 
846  if (tps->high_water_alert) {
847  if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
848  /* Update water mark alert immediately */
849  tps->high_water_alert = 0;
850  tps_alert_add(tps, -1);
851  }
852  } else {
853  if (high_water < tps->tps_queue_size) {
854  /* Update water mark alert immediately */
855  tps->high_water_alert = 1;
856  tps_alert_add(tps, +1);
857  }
858  }
859 
860  ao2_unlock(tps);
861 
862  return 0;
863 }
unsigned int high_water_alert
Definition: taskprocessor.c:89
#define ao2_unlock(a)
Definition: astobj2.h:730
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
#define ao2_lock(a)
Definition: astobj2.h:718
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74

◆ ast_taskprocessor_build_name()

void ast_taskprocessor_build_name ( char *  buf,
unsigned int  size,
const char *  format,
  ... 
)

Build a taskprocessor name with a sequence number on the end.

Since
13.8.0
Parameters
bufWhere to put the built taskprocessor name.
sizeHow large is buf including null terminator.
formatprintf format to create the non-sequenced part of the name.
Note
The user supplied part of the taskprocessor name is truncated to allow the full sequence number to be appended within the supplied buffer size.
Returns
Nothing

Definition at line 1295 of file taskprocessor.c.

References ast_assert, ast_taskprocessor_seq_num(), NULL, and SEQ_STR_SIZE.

Referenced by alloc_playback_chan(), allocate_subscription_tree(), ast_sip_session_alloc(), create_websocket_serializer(), distributor_pool_setup(), internal_stasis_subscribe(), refer_progress_alloc(), sip_options_aor_alloc(), sip_outbound_publisher_alloc(), sip_outbound_registration_state_alloc(), and sorcery_object_type_alloc().

1296 {
1297  va_list ap;
1298  int user_size;
1299 
1300  ast_assert(buf != NULL);
1301  ast_assert(SEQ_STR_SIZE <= size);
1302 
1303  va_start(ap, format);
1304  user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1305  va_end(ap);
1306  if (user_size < 0) {
1307  /*
1308  * Wow! We got an output error to a memory buffer.
1309  * Assume no user part of name written.
1310  */
1311  user_size = 0;
1312  } else if (size < user_size + SEQ_STR_SIZE) {
1313  /* Truncate user part of name to make sequence number fit. */
1314  user_size = size - SEQ_STR_SIZE;
1315  }
1316 
1317  /* Append sequence number to end of user name. */
1318  snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1319 }
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define SEQ_STR_SIZE
static snd_pcm_format_t format
Definition: chan_alsa.c:102

◆ ast_taskprocessor_create_with_listener()

struct ast_taskprocessor* ast_taskprocessor_create_with_listener ( const char *  name,
struct ast_taskprocessor_listener listener 
)

Create a taskprocessor with a custom listener.

Since
12.0.0

Note that when a taskprocessor is created in this way, it does not create any threads to execute the tasks. This job is left up to the listener. The listener's start() callback will be called during this function.

Parameters
nameThe name of the taskprocessor to create
listenerThe listener for operations on this taskprocessor
Return values
NULLFailure non-NULL success

Definition at line 1083 of file taskprocessor.c.

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_unlock, ast_taskprocessor_unreference(), NULL, OBJ_KEY, and OBJ_NOLOCK.

Referenced by AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

1084 {
1085  struct ast_taskprocessor *p;
1086 
1087  ao2_lock(tps_singletons);
1088  p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1089  if (p) {
1090  ao2_unlock(tps_singletons);
1092  return NULL;
1093  }
1094 
1095  p = __allocate_taskprocessor(name, listener);
1096  ao2_unlock(tps_singletons);
1097 
1098  return __start_taskprocessor(p);
1099 }
#define OBJ_KEY
Definition: astobj2.h:1155
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static const char name[]
Definition: cdr_mysql.c:74
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69

◆ ast_taskprocessor_execute()

int ast_taskprocessor_execute ( struct ast_taskprocessor tps)

Pop a task off the taskprocessor and execute it.

Since
12.0.0
Parameters
tpsThe taskprocessor from which to execute.
Return values
0There is no further work to be done.
1Tasks still remain in the taskprocessor queue.

Definition at line 1212 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ao2_lock, ao2_unlock, AST_PTHREADT_NULL, ast_taskprocessor_size(), tps_task::callback, ast_taskprocessor_listener::callbacks, ast_taskprocessor_local::data, tps_task::datap, ast_taskprocessor_listener_callbacks::emptied, tps_task::execute, tps_task::execute_local, ast_taskprocessor::executing, ast_taskprocessor::listener, ast_taskprocessor::local_data, ast_taskprocessor_local::local_data, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::stats, ast_taskprocessor::thread, tps_task_free(), tps_taskprocessor_pop(), and tps_task::wants_local.

Referenced by AST_TEST_DEFINE(), default_tps_processing_function(), execute_tasks(), and threadpool_execute().

1213 {
1214  struct ast_taskprocessor_local local;
1215  struct tps_task *t;
1216  long size;
1217 
1218  ao2_lock(tps);
1219  t = tps_taskprocessor_pop(tps);
1220  if (!t) {
1221  ao2_unlock(tps);
1222  return 0;
1223  }
1224 
1225  tps->thread = pthread_self();
1226  tps->executing = 1;
1227 
1228  if (t->wants_local) {
1229  local.local_data = tps->local_data;
1230  local.data = t->datap;
1231  }
1232  ao2_unlock(tps);
1233 
1234  if (t->wants_local) {
1235  t->callback.execute_local(&local);
1236  } else {
1237  t->callback.execute(t->datap);
1238  }
1239  tps_task_free(t);
1240 
1241  ao2_lock(tps);
1242  tps->thread = AST_PTHREADT_NULL;
1243  /* We need to check size in the same critical section where we reset the
1244  * executing bit. Avoids a race condition where a task is pushed right
1245  * after we pop an empty stack.
1246  */
1247  tps->executing = 0;
1248  size = ast_taskprocessor_size(tps);
1249 
1250  /* Update the stats */
1252 
1253  /* Include the task we just executed as part of the queue size. */
1254  if (size >= tps->stats.max_qsize) {
1255  tps->stats.max_qsize = size + 1;
1256  }
1257  ao2_unlock(tps);
1258 
1259  /* If we executed a task, check for the transition to empty */
1260  if (size == 0 && tps->listener->callbacks->emptied) {
1261  tps->listener->callbacks->emptied(tps->listener);
1262  }
1263  return size > 0;
1264 }
const struct ast_taskprocessor_listener_callbacks * callbacks
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
union tps_task::@428 callback
The execute() task callback function pointer.
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define ao2_unlock(a)
Definition: astobj2.h:730
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:54
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:65
int(* execute_local)(struct ast_taskprocessor_local *local)
Definition: taskprocessor.c:51
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ao2_lock(a)
Definition: astobj2.h:718
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
unsigned int executing
Definition: taskprocessor.c:85
Local data parameter.
int(* execute)(void *datap)
Definition: taskprocessor.c:50
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
unsigned int wants_local
Definition: taskprocessor.c:57
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:63
static void * tps_task_free(struct tps_task *task)
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
Definition: taskprocessor.c:71

◆ ast_taskprocessor_get()

struct ast_taskprocessor* ast_taskprocessor_get ( const char *  name,
enum ast_tps_options  create 
)

Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary.

The default behavior of instantiating a taskprocessor if one does not already exist can be disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().

Parameters
nameThe name of the taskprocessor
createUse 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does not already exist return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist
Since
1.6.1

Definition at line 1044 of file taskprocessor.c.

References __allocate_taskprocessor(), __start_taskprocessor(), ao2_find, ao2_lock, ao2_ref, ao2_unlock, ast_log, ast_strlen_zero, ast_taskprocessor_listener_alloc(), default_listener_pvt_alloc(), default_listener_pvt_destroy(), listener(), LOG_ERROR, NULL, OBJ_KEY, OBJ_NOLOCK, and TPS_REF_IF_EXISTS.

Referenced by alloc_playback_chan(), ast_dns_system_resolver_init(), ast_msg_init(), AST_TEST_DEFINE(), cli_tps_ping(), cli_tps_reset_stats(), find_request_serializer(), internal_stasis_subscribe(), load_module(), load_objects(), and threadpool_alloc().

1045 {
1046  struct ast_taskprocessor *p;
1049 
1050  if (ast_strlen_zero(name)) {
1051  ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
1052  return NULL;
1053  }
1054  ao2_lock(tps_singletons);
1055  p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1056  if (p || (create & TPS_REF_IF_EXISTS)) {
1057  /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1058  ao2_unlock(tps_singletons);
1059  return p;
1060  }
1061 
1062  /* Create a new taskprocessor. Start by creating a default listener */
1064  if (!pvt) {
1065  ao2_unlock(tps_singletons);
1066  return NULL;
1067  }
1069  if (!listener) {
1070  ao2_unlock(tps_singletons);
1072  return NULL;
1073  }
1074 
1075  p = __allocate_taskprocessor(name, listener);
1076  ao2_unlock(tps_singletons);
1077  p = __start_taskprocessor(p);
1078  ao2_ref(listener, -1);
1079 
1080  return p;
1081 }
A listener for taskprocessors.
#define OBJ_KEY
Definition: astobj2.h:1155
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ao2_unlock(a)
Definition: astobj2.h:730
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
#define NULL
Definition: resample.c:96
#define ast_strlen_zero(foo)
Definition: strings.h:52
#define ast_log
Definition: astobj2.c:42
static void * listener(void *unused)
Definition: asterisk.c:1476
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
#define LOG_ERROR
Definition: logger.h:285
static const char name[]
Definition: cdr_mysql.c:74
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:77
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
static void * default_listener_pvt_alloc(void)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks

◆ ast_taskprocessor_get_subsystem_alert()

unsigned int ast_taskprocessor_get_subsystem_alert ( const char *  subsystem)

Get the current taskprocessor high water alert count by sybsystem.

Since
13.26.0
16.3.0
Parameters
subsystemThe subsystem name
Return values
0if no taskprocessors are in high water alert.
non-zeroif some task processors are in high water alert.

Definition at line 637 of file taskprocessor.c.

References subsystem_alert::alert_count, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, and subsystem_match().

Referenced by AST_TEST_DEFINE(), and distributor().

638 {
639  struct subsystem_alert *alert;
640  unsigned int count = 0;
641  int idx;
642 
643  AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
644  idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
645  if (idx >= 0) {
646  alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
647  count = alert->alert_count;
648  }
649  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
650 
651  return count;
652 }
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition: vector.h:721
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:880
unsigned int alert_count
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682

◆ ast_taskprocessor_is_suspended()

int ast_taskprocessor_is_suspended ( struct ast_taskprocessor tps)

Get the task processor suspend status.

Since
13.12.0
Parameters
tpsTask processor.
Return values
non-zeroif the task processor is suspended

Definition at line 1207 of file taskprocessor.c.

References ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend().

1208 {
1209  return tps ? tps->suspended : -1;
1210 }
unsigned int suspended
Definition: taskprocessor.c:91

◆ ast_taskprocessor_is_task()

int ast_taskprocessor_is_task ( struct ast_taskprocessor tps)

Am I the given taskprocessor's current task.

Since
12.7.0
Parameters
tpsTaskprocessor to check.
Return values
non-zeroif current thread is the taskprocessor thread.

Definition at line 1266 of file taskprocessor.c.

References ao2_lock, ao2_unlock, and ast_taskprocessor::thread.

Referenced by ast_sip_push_task_wait_serializer(), ast_sip_session_suspend(), and handle_new_invite_request().

1267 {
1268  int is_task;
1269 
1270  ao2_lock(tps);
1271  is_task = pthread_equal(tps->thread, pthread_self());
1272  ao2_unlock(tps);
1273  return is_task;
1274 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718

◆ ast_taskprocessor_listener_alloc()

struct ast_taskprocessor_listener* ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks callbacks,
void *  user_data 
)

Allocate a taskprocessor listener.

Since
12.0.0

This will result in the listener being allocated with the specified callbacks.

Parameters
callbacksThe callbacks to assign to the listener
user_dataThe user data for the listener
Return values
NULLFailure
non-NULLThe newly allocated taskprocessor listener

Definition at line 930 of file taskprocessor.c.

References ao2_alloc, ast_taskprocessor_listener::callbacks, listener(), NULL, taskprocessor_listener_dtor(), and ast_taskprocessor_listener::user_data.

Referenced by ast_taskprocessor_get(), AST_TEST_DEFINE(), ast_threadpool_create(), and ast_threadpool_serializer_group().

931 {
933 
934  listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
935  if (!listener) {
936  return NULL;
937  }
938  listener->callbacks = callbacks;
939  listener->user_data = user_data;
940 
941  return listener;
942 }
A listener for taskprocessors.
const struct ast_taskprocessor_listener_callbacks * callbacks
#define NULL
Definition: resample.c:96
static void * listener(void *unused)
Definition: asterisk.c:1476
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static void taskprocessor_listener_dtor(void *obj)

◆ ast_taskprocessor_listener_get_tps()

struct ast_taskprocessor* ast_taskprocessor_listener_get_tps ( const struct ast_taskprocessor_listener listener)

Get a reference to the listener's taskprocessor.

This will return the taskprocessor with its reference count increased. Release the reference to this object by using ast_taskprocessor_unreference()

Parameters
listenerThe listener that has the taskprocessor
Returns
The taskprocessor

Definition at line 944 of file taskprocessor.c.

References ao2_ref, and ast_taskprocessor_listener::tps.

Referenced by serializer_task_pushed().

945 {
946  ao2_ref(listener->tps, +1);
947  return listener->tps;
948 }
struct ast_taskprocessor * tps
#define ao2_ref(o, delta)
Definition: astobj2.h:464

◆ ast_taskprocessor_listener_get_user_data()

void* ast_taskprocessor_listener_get_user_data ( const struct ast_taskprocessor_listener listener)

Get the user data from the listener.

Parameters
listenerThe taskprocessor listener
Returns
The listener's user data

Definition at line 950 of file taskprocessor.c.

References ast_taskprocessor_listener::user_data.

Referenced by serializer_shutdown(), serializer_task_pushed(), test_emptied(), test_shutdown(), test_task_pushed(), threadpool_tps_emptied(), threadpool_tps_shutdown(), and threadpool_tps_task_pushed().

951 {
952  return listener->user_data;
953 }

◆ ast_taskprocessor_name()

const char* ast_taskprocessor_name ( struct ast_taskprocessor tps)

Return the name of the taskprocessor singleton.

Since
1.6.1

Definition at line 906 of file taskprocessor.c.

References ast_log, LOG_ERROR, ast_taskprocessor::name, and NULL.

Referenced by ast_serializer_pool_set_alerts(), ast_sip_get_distributor_serializer(), distributor(), grow(), record_serializer(), shrink(), sip_options_apply_aor_configuration(), and sip_options_contact_add_task().

907 {
908  if (!tps) {
909  ast_log(LOG_ERROR, "no taskprocessor specified!\n");
910  return NULL;
911  }
912  return tps->name;
913 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
#define LOG_ERROR
Definition: logger.h:285

◆ ast_taskprocessor_name_append()

void ast_taskprocessor_name_append ( char *  buf,
unsigned int  size,
const char *  name 
)

Append the next sequence number to the given string, and copy into the buffer.

Parameters
bufWhere to copy the appended taskprocessor name.
sizeHow large is buf including null terminator.
nameA name to append the sequence number to.

Definition at line 1285 of file taskprocessor.c.

References ast_assert, ast_taskprocessor_seq_num(), NULL, and SEQ_STR_SIZE.

Referenced by ast_serializer_pool_create().

1286 {
1287  int final_size = strlen(name) + SEQ_STR_SIZE;
1288 
1289  ast_assert(buf != NULL && name != NULL);
1290  ast_assert(final_size <= size);
1291 
1292  snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
1293 }
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
char buf[BUFSIZE]
Definition: eagi_proxy.c:66
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
static const char name[]
Definition: cdr_mysql.c:74
#define SEQ_STR_SIZE

◆ ast_taskprocessor_push()

int ast_taskprocessor_push ( struct ast_taskprocessor tps,
int(*)(void *datap)  task_exe,
void *  datap 
)

Push a task into the specified taskprocessor queue and signal the taskprocessor thread.

Parameters
tpsThe taskprocessor structure
task_exeThe task handling function to push into the taskprocessor queue
datapThe data to be used by the task handling function
Return values
0success
-1failure
Since
1.6.1

Definition at line 1175 of file taskprocessor.c.

References taskprocessor_push(), and tps_task_alloc().

Referenced by ast_cc_agent_status_response(), ast_cc_monitor_failed(), ast_cc_monitor_party_b_free(), ast_cc_monitor_status_request(), ast_cc_monitor_stop_ringing(), ast_msg_queue(), ast_sip_push_task(), ast_sorcery_create(), ast_sorcery_delete(), ast_sorcery_update(), AST_TEST_DEFINE(), ast_threadpool_push(), ast_threadpool_set_size(), async_delete_name_rec(), async_play_sound_helper(), cc_request_state_change(), cli_tps_ping(), default_listener_shutdown(), destroy_conference_bridge(), dns_system_resolver_resolve(), generic_monitor_devstate_cb(), handle_cc_status(), hepv3_send_packet(), iax2_transmit(), mwi_handle_subscribe(), mwi_handle_unsubscribe(), play_sound_helper(), sorcery_object_load(), stasis_unsubscribe(), threadpool_active_thread_idle(), threadpool_idle_thread_dead(), threadpool_tps_emptied(), threadpool_tps_task_pushed(), and threadpool_zombie_thread_dead().

1176 {
1177  return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
1178 }
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)

◆ ast_taskprocessor_push_local()

int ast_taskprocessor_push_local ( struct ast_taskprocessor tps,
int(*)(struct ast_taskprocessor_local *datap)  task_exe,
void *  datap 
)

Definition at line 1180 of file taskprocessor.c.

References taskprocessor_push(), and tps_task_alloc_local().

1181 {
1182  return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
1183 }
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)

◆ ast_taskprocessor_seq_num()

unsigned int ast_taskprocessor_seq_num ( void  )

Get the next sequence number to create a human friendly taskprocessor name.

Since
13.8.0
Returns
Sequence number for use in creating human friendly taskprocessor names.

Definition at line 1276 of file taskprocessor.c.

References ast_atomic_fetchadd_int().

Referenced by ast_taskprocessor_build_name(), and ast_taskprocessor_name_append().

1277 {
1278  static int seq_num;
1279 
1280  return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1281 }
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755

◆ ast_taskprocessor_set_local()

void ast_taskprocessor_set_local ( struct ast_taskprocessor tps,
void *  local_data 
)

Sets the local data associated with a taskprocessor.

Since
12.0.0

See ast_taskprocessor_push_local().

Parameters
tpsTask processor.
local_dataLocal data to associate with tps.

Definition at line 1101 of file taskprocessor.c.

References ast_taskprocessor::local_data, lock, and SCOPED_AO2LOCK.

Referenced by AST_TEST_DEFINE(), and internal_stasis_subscribe().

1103 {
1104  SCOPED_AO2LOCK(lock, tps);
1105  tps->local_data = local_data;
1106 }
ast_mutex_t lock
Definition: app_meetme.c:1091
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602

◆ ast_taskprocessor_size()

long ast_taskprocessor_size ( struct ast_taskprocessor tps)

Return the current size of the taskprocessor queue.

Since
13.7.0

Definition at line 900 of file taskprocessor.c.

References ast_taskprocessor::tps_queue_size.

Referenced by ast_serializer_pool_get(), ast_taskprocessor_execute(), AST_TEST_DEFINE(), and ast_threadpool_queue_size().

901 {
902  return (tps) ? tps->tps_queue_size : -1;
903 }
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74

◆ ast_taskprocessor_suspend()

int ast_taskprocessor_suspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is suspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1185 of file taskprocessor.c.

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_suspend(), and AST_TEST_DEFINE().

1186 {
1187  if (tps) {
1188  ao2_lock(tps);
1189  tps->suspended = 1;
1190  ao2_unlock(tps);
1191  return 0;
1192  }
1193  return -1;
1194 }
unsigned int suspended
Definition: taskprocessor.c:91
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718

◆ ast_taskprocessor_unreference()

void* ast_taskprocessor_unreference ( struct ast_taskprocessor tps)

Unreference the specified taskprocessor and its reference count will decrement.

Taskprocessors use astobj2 and will unlink from the taskprocessor singleton container and destroy themself when the taskprocessor reference count reaches zero.

Parameters
tpstaskprocessor to unreference
Returns
NULL
Since
1.6.1

Definition at line 1109 of file taskprocessor.c.

References ao2_lock, ao2_ref, ao2_unlink_flags, ao2_unlock, ast_taskprocessor::listener, listener_shutdown(), NULL, and OBJ_NOLOCK.

Referenced by __start_taskprocessor(), __unload_module(), ast_msg_shutdown(), ast_res_pjsip_cleanup_options_handling(), ast_serializer_pool_destroy(), ast_taskprocessor_create_with_listener(), AST_TEST_DEFINE(), ast_threadpool_shutdown(), cli_tps_ping(), cli_tps_reset_stats(), cli_tps_reset_stats_all(), destroy_conference_bridge(), distributor(), distributor_pool_shutdown(), dns_system_resolver_destroy(), execute_tasks(), exten_state_subscription_destructor(), refer_progress_destroy(), scheduler(), schtd_dtor(), serializer_task_pushed(), session_destructor(), sip_options_aor_dtor(), sip_outbound_publisher_destroy(), sip_outbound_registration_client_state_destroy(), sip_resolve_destroy(), sorcery_object_type_destructor(), subscription_dtor(), subscription_persistence_recreate(), subscription_tree_destructor(), tps_report_taskprocessor_list(), tps_shutdown_thread(), tps_taskprocessor_tab_complete(), unload_module(), and websocket_cb().

1110 {
1111  if (!tps) {
1112  return NULL;
1113  }
1114 
1115  /* To prevent another thread from finding and getting a reference to this
1116  * taskprocessor we hold the singletons lock. If we didn't do this then
1117  * they may acquire it and find that the listener has been shut down.
1118  */
1119  ao2_lock(tps_singletons);
1120 
1121  if (ao2_ref(tps, -1) > 3) {
1122  ao2_unlock(tps_singletons);
1123  return NULL;
1124  }
1125 
1126  /* If we're down to 3 references, then those must be:
1127  * 1. The reference we just got rid of
1128  * 2. The container
1129  * 3. The listener
1130  */
1131  ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1132  ao2_unlock(tps_singletons);
1133 
1135  return NULL;
1136 }
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
#define ao2_unlink_flags(container, obj, flags)
Definition: astobj2.h:1622

◆ ast_taskprocessor_unsuspend()

int ast_taskprocessor_unsuspend ( struct ast_taskprocessor tps)

Indicate the taskprocessor is unsuspended.

Since
13.12.0
Parameters
tpsTask processor.
Return values
0success
-1failure

Definition at line 1196 of file taskprocessor.c.

References ao2_lock, ao2_unlock, and ast_taskprocessor::suspended.

Referenced by ast_sip_session_unsuspend(), and AST_TEST_DEFINE().

1197 {
1198  if (tps) {
1199  ao2_lock(tps);
1200  tps->suspended = 0;
1201  ao2_unlock(tps);
1202  return 0;
1203  }
1204  return -1;
1205 }
unsigned int suspended
Definition: taskprocessor.c:91
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718

◆ ast_tps_init()

int ast_tps_init ( void  )

Provided by taskprocessor.c

Definition at line 301 of file taskprocessor.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_cond_init, ast_log, ast_register_cleanup(), AST_VECTOR_RW_INIT, LOG_ERROR, NULL, tps_cmp_cb(), tps_hash_cb(), TPS_MAX_BUCKETS, and tps_shutdown().

Referenced by asterisk_daemon().

302 {
305  if (!tps_singletons) {
306  ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
307  return -1;
308  }
309 
310  if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
311  ao2_ref(tps_singletons, -1);
312  ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
313  return -1;
314  }
315 
316  ast_cond_init(&cli_ping_cond, NULL);
317 
319 
321 
322  return 0;
323 }
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
static void tps_shutdown(void)
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
#define TPS_MAX_BUCKETS
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
static struct ast_cli_entry taskprocessor_clis[]

◆ AST_VECTOR_RW()

static AST_VECTOR_RW ( subsystem_alert_vector  ,
struct subsystem_alert  
)
static

CLI taskprocessor ping <blah>operation requires a ping condition lock.

Definition at line 127 of file taskprocessor.c.

159  {

◆ cli_subsystem_alert_report()

static char * cli_subsystem_alert_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 729 of file taskprocessor.c.

References subsystem_alert::alert_count, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), ast_free, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_INIT, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_FIELDS_SUBSYSTEM, FMT_HEADERS_SUBSYSTEM, NULL, subsystem_alert::subsystem, subsystem_copy(), and ast_cli_entry::usage.

730 {
731  struct subsystem_alert_vector sorted_subsystems;
732  int i;
733 
734 #define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
735 #define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
736 
737  switch (cmd) {
738  case CLI_INIT:
739  e->command = "core show taskprocessor alerted subsystems";
740  e->usage =
741  "Usage: core show taskprocessor alerted subsystems\n"
742  " Shows a list of task processor subsystems that are currently alerted\n";
743  return NULL;
744  case CLI_GENERATE:
745  return NULL;
746  }
747 
748  if (a->argc != e->args) {
749  return CLI_SHOWUSAGE;
750  }
751 
752  if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
753  return CLI_FAILURE;
754  }
755 
756  AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
757  for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
758  subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
759  }
760  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
761 
762  ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
763 
764  for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
765  struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
766  ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
767  }
768 
769  ast_cli(a->fd, "\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
770 
771  AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
772  AST_VECTOR_FREE(&sorted_subsystems);
773 
774  return CLI_SUCCESS;
775 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
const int argc
Definition: cli.h:160
Definition: cli.h:152
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:880
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define FMT_HEADERS_SUBSYSTEM
int args
This gets set in ast_cli_register()
Definition: cli.h:185
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
unsigned int alert_count
const int fd
Definition: cli.h:159
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_FAILURE
Definition: cli.h:46
#define ast_free(a)
Definition: astmm.h:182
char * command
Definition: cli.h:186
#define FMT_FIELDS_SUBSYSTEM
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:865

◆ cli_tps_ping()

static char * cli_tps_ping ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 411 of file taskprocessor.c.

References ast_cli_args::argc, ast_cli_args::argv, ast_cli(), ast_cond_timedwait, ast_mutex_lock, ast_mutex_unlock, ast_samp2tv(), ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), ast_tvadd(), ast_tvnow(), ast_tvsub(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, name, NULL, ast_cli_args::pos, tps_ping_handler(), TPS_REF_IF_EXISTS, tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

412 {
413  struct timeval begin, end, delta;
414  const char *name;
415  struct timeval when;
416  struct timespec ts;
417  struct ast_taskprocessor *tps;
418 
419  switch (cmd) {
420  case CLI_INIT:
421  e->command = "core ping taskprocessor";
422  e->usage =
423  "Usage: core ping taskprocessor <taskprocessor>\n"
424  " Displays the time required for a task to be processed\n";
425  return NULL;
426  case CLI_GENERATE:
427  if (a->pos == 3) {
429  } else {
430  return NULL;
431  }
432  }
433 
434  if (a->argc != 4)
435  return CLI_SHOWUSAGE;
436 
437  name = a->argv[3];
438  if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
439  ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
440  return CLI_SUCCESS;
441  }
442  ast_cli(a->fd, "\npinging %s ...", name);
443 
444  /*
445  * Wait up to 5 seconds for a ping reply.
446  *
447  * On a very busy system it could take awhile to get a
448  * ping response from some taskprocessors.
449  */
450  begin = ast_tvnow();
451  when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
452  ts.tv_sec = when.tv_sec;
453  ts.tv_nsec = when.tv_usec * 1000;
454 
455  ast_mutex_lock(&cli_ping_cond_lock);
456  if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
457  ast_mutex_unlock(&cli_ping_cond_lock);
458  ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
460  return CLI_FAILURE;
461  }
462  ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
463  ast_mutex_unlock(&cli_ping_cond_lock);
464 
465  end = ast_tvnow();
466  delta = ast_tvsub(end, begin);
467  ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
469  return CLI_SUCCESS;
470 }
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
const int argc
Definition: cli.h:160
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
Definition: cli.h:152
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_mutex_lock(a)
Definition: lock.h:187
#define NULL
Definition: resample.c:96
char * end
Definition: eagi_proxy.c:73
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
const int fd
Definition: cli.h:159
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition: time.h:238
const char *const * argv
Definition: cli.h:161
#define CLI_SHOWUSAGE
Definition: cli.h:45
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2283
#define CLI_FAILURE
Definition: cli.h:46
static const char name[]
Definition: cdr_mysql.c:74
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:77
#define CLI_SUCCESS
Definition: cli.h:44
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
const int pos
Definition: cli.h:164
static int tps_ping_handler(void *datap)
CLI taskprocessor ping <blah>handler function.
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: extconf.c:2298
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define ast_mutex_unlock(a)
Definition: lock.h:188

◆ cli_tps_report()

static char * cli_tps_report ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 576 of file taskprocessor.c.

References ast_cli_args::argc, ast_cli_entry::args, ast_cli_args::argv, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_HEADERS, NULL, ast_cli_args::pos, tps_report_taskprocessor_list(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

577 {
578  const char *like;
579 
580  switch (cmd) {
581  case CLI_INIT:
582  e->command = "core show taskprocessors [like]";
583  e->usage =
584  "Usage: core show taskprocessors [like keyword]\n"
585  " Shows a list of instantiated task processors and their statistics\n";
586  return NULL;
587  case CLI_GENERATE:
588  if (a->pos == e->args) {
590  } else {
591  return NULL;
592  }
593  }
594 
595  if (a->argc == e->args - 1) {
596  like = "";
597  } else if (a->argc == e->args + 1 && !strcasecmp(a->argv[e->args-1], "like")) {
598  like = a->argv[e->args];
599  } else {
600  return CLI_SHOWUSAGE;
601  }
602 
603  ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
604  ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
605 
606  return CLI_SUCCESS;
607 }
static int tps_report_taskprocessor_list(int fd, const char *like)
const int argc
Definition: cli.h:160
Definition: cli.h:152
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
#define FMT_HEADERS
int args
This gets set in ast_cli_register()
Definition: cli.h:185
const int fd
Definition: cli.h:159
const char *const * argv
Definition: cli.h:161
#define CLI_SHOWUSAGE
Definition: cli.h:45
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
const int pos
Definition: cli.h:164
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)

◆ cli_tps_reset_stats()

static char * cli_tps_reset_stats ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1329 of file taskprocessor.c.

References ast_cli_args::argc, ast_cli_args::argv, ast_cli(), ast_taskprocessor_get(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, name, NULL, TPS_REF_IF_EXISTS, tps_reset_stats(), tps_taskprocessor_tab_complete(), and ast_cli_entry::usage.

1330 {
1331  const char *name;
1332  struct ast_taskprocessor *tps;
1333 
1334  switch (cmd) {
1335  case CLI_INIT:
1336  e->command = "core reset taskprocessor";
1337  e->usage =
1338  "Usage: core reset taskprocessor <taskprocessor>\n"
1339  " Resets stats for the specified taskprocessor\n";
1340  return NULL;
1341  case CLI_GENERATE:
1343  }
1344 
1345  if (a->argc != 4) {
1346  return CLI_SHOWUSAGE;
1347  }
1348 
1349  name = a->argv[3];
1350  if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
1351  ast_cli(a->fd, "\nReset failed: %s not found\n\n", name);
1352  return CLI_SUCCESS;
1353  }
1354  ast_cli(a->fd, "\nResetting %s\n\n", name);
1355 
1356  tps_reset_stats(tps);
1357 
1359 
1360  return CLI_SUCCESS;
1361 }
const int argc
Definition: cli.h:160
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
Definition: cli.h:152
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
const int fd
Definition: cli.h:159
static void tps_reset_stats(struct ast_taskprocessor *tps)
const char *const * argv
Definition: cli.h:161
#define CLI_SHOWUSAGE
Definition: cli.h:45
static const char name[]
Definition: cdr_mysql.c:74
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
return a reference to a taskprocessor ONLY if it already exists
Definition: taskprocessor.h:77
#define CLI_SUCCESS
Definition: cli.h:44
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...

◆ cli_tps_reset_stats_all()

static char * cli_tps_reset_stats_all ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 1363 of file taskprocessor.c.

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), ast_taskprocessor_unreference(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, NULL, tps_reset_stats(), and ast_cli_entry::usage.

1364 {
1365  struct ast_taskprocessor *tps;
1366  struct ao2_iterator iter;
1367 
1368  switch (cmd) {
1369  case CLI_INIT:
1370  e->command = "core reset taskprocessors";
1371  e->usage =
1372  "Usage: core reset taskprocessors\n"
1373  " Resets stats for all taskprocessors\n";
1374  return NULL;
1375  case CLI_GENERATE:
1376  return NULL;
1377  }
1378 
1379  if (a->argc != e->args) {
1380  return CLI_SHOWUSAGE;
1381  }
1382 
1383  ast_cli(a->fd, "\nResetting stats for all taskprocessors\n\n");
1384 
1385  iter = ao2_iterator_init(tps_singletons, 0);
1386  while ((tps = ao2_iterator_next(&iter))) {
1387  tps_reset_stats(tps);
1389  }
1390  ao2_iterator_destroy(&iter);
1391 
1392  return CLI_SUCCESS;
1393 }
const int argc
Definition: cli.h:160
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
Definition: cli.h:152
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
int args
This gets set in ast_cli_register()
Definition: cli.h:185
const int fd
Definition: cli.h:159
static void tps_reset_stats(struct ast_taskprocessor *tps)
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.

◆ default_listener_die()

static int default_listener_die ( void *  data)
static

Definition at line 243 of file taskprocessor.c.

References default_taskprocessor_listener_pvt::dead.

Referenced by default_listener_shutdown().

244 {
245  struct default_taskprocessor_listener_pvt *pvt = data;
246  pvt->dead = 1;
247  return 0;
248 }

◆ default_listener_pvt_alloc()

static void* default_listener_pvt_alloc ( void  )
static

Definition at line 955 of file taskprocessor.c.

References ast_calloc, ast_free, ast_log, AST_PTHREADT_NULL, ast_sem_init(), errno, LOG_ERROR, NULL, default_taskprocessor_listener_pvt::poll_thread, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get().

956 {
958 
959  pvt = ast_calloc(1, sizeof(*pvt));
960  if (!pvt) {
961  return NULL;
962  }
964  if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
965  ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
966  ast_free(pvt);
967  return NULL;
968  }
969  return pvt;
970 }
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define LOG_ERROR
Definition: logger.h:285
int errno
#define ast_free(a)
Definition: astmm.h:182
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.

◆ default_listener_pvt_destroy()

static void default_listener_pvt_destroy ( struct default_taskprocessor_listener_pvt pvt)
static

Definition at line 173 of file taskprocessor.c.

References ast_assert, ast_free, ast_sem_destroy(), default_taskprocessor_listener_pvt::dead, and default_taskprocessor_listener_pvt::sem.

Referenced by ast_taskprocessor_get(), and default_listener_pvt_dtor().

174 {
175  ast_assert(pvt->dead);
176  ast_sem_destroy(&pvt->sem);
177  ast_free(pvt);
178 }
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.
#define ast_assert(a)
Definition: utils.h:695
#define ast_free(a)
Definition: astmm.h:182

◆ default_listener_pvt_dtor()

static void default_listener_pvt_dtor ( struct ast_taskprocessor_listener listener)
static

Definition at line 180 of file taskprocessor.c.

References default_listener_pvt_destroy(), NULL, and ast_taskprocessor_listener::user_data.

181 {
182  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
183 
185 
186  listener->user_data = NULL;
187 }
#define NULL
Definition: resample.c:96
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)

◆ default_listener_shutdown()

static void default_listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 250 of file taskprocessor.c.

References ao2_t_ref, ast_assert, ast_log, AST_PTHREADT_NULL, ast_taskprocessor_push(), default_listener_die(), errno, LOG_ERROR, NULL, default_taskprocessor_listener_pvt::poll_thread, ast_taskprocessor_listener::tps, and ast_taskprocessor_listener::user_data.

251 {
252  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
253  int res;
254 
255  /* Hold a reference during shutdown */
256  ao2_t_ref(listener->tps, +1, "tps-shutdown");
257 
258  if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
259  /* This will cause the thread to exit early without completing tasks already
260  * in the queue. This is probably the least bad option in this situation. */
262  }
263 
265 
266  if (pthread_equal(pthread_self(), pvt->poll_thread)) {
267  res = pthread_detach(pvt->poll_thread);
268  if (res != 0) {
269  ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
270  }
271  } else {
272  res = pthread_join(pvt->poll_thread, NULL);
273  if (res != 0) {
274  ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
275  }
276  }
278 }
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:463
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct ast_taskprocessor * tps
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
#define AST_PTHREADT_NULL
Definition: lock.h:66
static int default_listener_die(void *data)
#define LOG_ERROR
Definition: logger.h:285
int errno

◆ default_listener_start()

static int default_listener_start ( struct ast_taskprocessor_listener listener)
static

Definition at line 222 of file taskprocessor.c.

References ast_pthread_create, default_tps_processing_function(), NULL, default_taskprocessor_listener_pvt::poll_thread, and ast_taskprocessor_listener::user_data.

223 {
224  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
225 
227  return -1;
228  }
229 
230  return 0;
231 }
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
#define NULL
Definition: resample.c:96
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:559

◆ default_task_pushed()

static void default_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

Definition at line 233 of file taskprocessor.c.

References ast_log, ast_sem_post(), errno, LOG_ERROR, default_taskprocessor_listener_pvt::sem, and ast_taskprocessor_listener::user_data.

234 {
235  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
236 
237  if (ast_sem_post(&pvt->sem) != 0) {
238  ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
239  strerror(errno));
240  }
241 }
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.
#define ast_log
Definition: astobj2.c:42
#define LOG_ERROR
Definition: logger.h:285
int errno

◆ default_tps_processing_function()

static void* default_tps_processing_function ( void *  data)
static

Function that processes tasks in the taskprocessor.

Definition at line 193 of file taskprocessor.c.

References ao2_t_ref, ast_assert, ast_log, ast_sem_getvalue(), ast_sem_wait(), ast_taskprocessor_execute(), default_taskprocessor_listener_pvt::dead, errno, listener(), LOG_ERROR, NULL, default_taskprocessor_listener_pvt::sem, ast_taskprocessor_listener::tps, and ast_taskprocessor_listener::user_data.

Referenced by default_listener_start().

194 {
195  struct ast_taskprocessor_listener *listener = data;
196  struct ast_taskprocessor *tps = listener->tps;
197  struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
198  int sem_value;
199  int res;
200 
201  while (!pvt->dead) {
202  res = ast_sem_wait(&pvt->sem);
203  if (res != 0 && errno != EINTR) {
204  ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
205  strerror(errno));
206  /* Just give up */
207  break;
208  }
210  }
211 
212  /* No posting to a dead taskprocessor! */
213  res = ast_sem_getvalue(&pvt->sem, &sem_value);
214  ast_assert(res == 0 && sem_value == 0);
215 
216  /* Free the shutdown reference (see default_listener_shutdown) */
217  ao2_t_ref(listener->tps, -1, "tps-shutdown");
218 
219  return NULL;
220 }
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:463
A listener for taskprocessors.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
struct ast_taskprocessor * tps
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
static void * listener(void *unused)
Definition: asterisk.c:1476
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
#define LOG_ERROR
Definition: logger.h:285
int errno
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69

◆ listener_shutdown()

static void listener_shutdown ( struct ast_taskprocessor_listener listener)
static

Definition at line 915 of file taskprocessor.c.

References ao2_ref, ast_taskprocessor_listener::callbacks, ast_taskprocessor_listener_callbacks::shutdown, and ast_taskprocessor_listener::tps.

Referenced by ast_taskprocessor_unreference().

916 {
917  listener->callbacks->shutdown(listener);
918  ao2_ref(listener->tps, -1);
919 }
const struct ast_taskprocessor_listener_callbacks * callbacks
void(* shutdown)(struct ast_taskprocessor_listener *listener)
Indicates the taskprocessor wishes to die.
struct ast_taskprocessor * tps
#define ao2_ref(o, delta)
Definition: astobj2.h:464

◆ subsystem_alert_decrement()

static void subsystem_alert_decrement ( const char *  subsystem)
static

Definition at line 686 of file taskprocessor.c.

References subsystem_alert::alert_count, ast_free, ast_log, ast_strlen_zero, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_REMOVE, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, LOG_ERROR, and subsystem_match().

Referenced by tps_alert_add().

687 {
688  struct subsystem_alert *alert;
689  int idx;
690 
691  if (ast_strlen_zero(subsystem)) {
692  return;
693  }
694 
695  AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
696  idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
697  if (idx < 0) {
699  "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
700  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
701  return;
702  }
703  alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
704 
705  alert->alert_count--;
706  if (alert->alert_count <= 0) {
707  AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
708  ast_free(alert);
709  }
710 
711  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
712 }
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition: vector.h:721
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
#define ast_strlen_zero(foo)
Definition: strings.h:52
#define ast_log
Definition: astobj2.c:42
unsigned int alert_count
#define LOG_ERROR
Definition: logger.h:285
#define ast_free(a)
Definition: astmm.h:182
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:890
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
Definition: vector.h:412

◆ subsystem_alert_increment()

static void subsystem_alert_increment ( const char *  subsystem)
static

Definition at line 654 of file taskprocessor.c.

References subsystem_alert::alert_count, ast_free, ast_malloc, ast_strlen_zero, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_GET_INDEX, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, subsystem_alert::subsystem, and subsystem_match().

Referenced by tps_alert_add().

655 {
656  struct subsystem_alert *alert;
657  int idx;
658 
659  if (ast_strlen_zero(subsystem)) {
660  return;
661  }
662 
663  AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
664  idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
665  if (idx >= 0) {
666  alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
667  alert->alert_count++;
668  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
669  return;
670  }
671 
672  alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
673  if (!alert) {
674  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
675  return;
676  }
677  alert->alert_count = 1;
678  strcpy(alert->subsystem, subsystem); /* Safe */
679 
680  if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
681  ast_free(alert);
682  }
683  AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
684 }
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
Definition: vector.h:721
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define ast_strlen_zero(foo)
Definition: strings.h:52
unsigned int alert_count
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
#define ast_free(a)
Definition: astmm.h:182
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:890
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682

◆ subsystem_cmp()

static int subsystem_cmp ( struct subsystem_alert a,
struct subsystem_alert b 
)
static

Definition at line 632 of file taskprocessor.c.

References subsystem_alert::subsystem.

Referenced by subsystem_copy().

633 {
634  return strcmp(a->subsystem, b->subsystem);
635 }

◆ subsystem_copy()

static void subsystem_copy ( struct subsystem_alert alert,
struct subsystem_alert_vector *  vector 
)
static

Definition at line 714 of file taskprocessor.c.

References subsystem_alert::alert_count, ast_free, ast_malloc, AST_VECTOR_ADD_SORTED, subsystem_alert::subsystem, and subsystem_cmp().

Referenced by cli_subsystem_alert_report().

716 {
717  struct subsystem_alert *alert_copy;
718  alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
719  if (!alert_copy) {
720  return;
721  }
722  alert_copy->alert_count = alert->alert_count;
723  strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
724  if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
725  ast_free(alert_copy);
726  }
727 }
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
Definition: vector.h:371
unsigned int alert_count
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
#define ast_free(a)
Definition: astmm.h:182

◆ subsystem_match()

static int subsystem_match ( struct subsystem_alert alert,
const char *  subsystem 
)
static

Definition at line 627 of file taskprocessor.c.

References subsystem_alert::subsystem.

Referenced by ast_taskprocessor_get_subsystem_alert(), subsystem_alert_decrement(), and subsystem_alert_increment().

628 {
629  return !strcmp(alert->subsystem, subsystem);
630 }

◆ taskprocessor_listener_dtor()

static void taskprocessor_listener_dtor ( void *  obj)
static

Definition at line 921 of file taskprocessor.c.

References ast_taskprocessor_listener::callbacks, ast_taskprocessor_listener_callbacks::dtor, and listener().

Referenced by ast_taskprocessor_listener_alloc().

922 {
923  struct ast_taskprocessor_listener *listener = obj;
924 
925  if (listener->callbacks->dtor) {
926  listener->callbacks->dtor(listener);
927  }
928 }
A listener for taskprocessors.
const struct ast_taskprocessor_listener_callbacks * callbacks
void(* dtor)(struct ast_taskprocessor_listener *listener)
static void * listener(void *unused)
Definition: asterisk.c:1476

◆ taskprocessor_push()

static int taskprocessor_push ( struct ast_taskprocessor tps,
struct tps_task t 
)
static

Definition at line 1139 of file taskprocessor.c.

References ao2_lock, ao2_unlock, AST_LIST_INSERT_TAIL, ast_log, ast_taskprocessor_listener::callbacks, ast_taskprocessor::executing, ast_taskprocessor::high_water_alert, ast_taskprocessor::high_water_warned, tps_task::list, ast_taskprocessor::listener, LOG_ERROR, LOG_WARNING, ast_taskprocessor::name, ast_taskprocessor_listener_callbacks::task_pushed, tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_high, and ast_taskprocessor::tps_queue_size.

Referenced by ast_taskprocessor_push(), and ast_taskprocessor_push_local().

1140 {
1141  int previous_size;
1142  int was_empty;
1143 
1144  if (!tps) {
1145  ast_log(LOG_ERROR, "tps is NULL!\n");
1146  return -1;
1147  }
1148 
1149  if (!t) {
1150  ast_log(LOG_ERROR, "t is NULL!\n");
1151  return -1;
1152  }
1153 
1154  ao2_lock(tps);
1155  AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
1156  previous_size = tps->tps_queue_size++;
1157 
1158  if (tps->tps_queue_high <= tps->tps_queue_size) {
1159  if (!tps->high_water_alert) {
1160  ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
1161  tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
1162  tps->high_water_warned = 1;
1163  tps->high_water_alert = 1;
1164  tps_alert_add(tps, +1);
1165  }
1166  }
1167 
1168  /* The currently executing task counts as still in queue */
1169  was_empty = tps->executing ? 0 : previous_size == 0;
1170  ao2_unlock(tps);
1171  tps->listener->callbacks->task_pushed(tps->listener, was_empty);
1172  return 0;
1173 }
const struct ast_taskprocessor_listener_callbacks * callbacks
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
#define LOG_WARNING
Definition: logger.h:274
unsigned int high_water_alert
Definition: taskprocessor.c:89
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define ao2_unlock(a)
Definition: astobj2.h:730
unsigned int high_water_warned
Definition: taskprocessor.c:87
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
#define ast_log
Definition: astobj2.c:42
#define ao2_lock(a)
Definition: astobj2.h:718
unsigned int executing
Definition: taskprocessor.c:85
#define LOG_ERROR
Definition: logger.h:285
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:730
struct ast_taskprocessor::tps_queue tps_queue
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:98

◆ tps_alert_add()

static void tps_alert_add ( struct ast_taskprocessor tps,
int  delta 
)
static

Definition at line 794 of file taskprocessor.c.

References ast_log, ast_rwlock_unlock, ast_rwlock_wrlock, DEBUG_ATLEAST, LOG_DEBUG, ast_taskprocessor::name, ast_taskprocessor::subsystem, subsystem_alert_decrement(), subsystem_alert_increment(), tps_alert_count, and tps_alert_lock.

Referenced by ast_taskprocessor_alert_set_levels(), taskprocessor_push(), tps_taskprocessor_dtor(), and tps_taskprocessor_pop().

795 {
796  unsigned int old;
797 
799  old = tps_alert_count;
800  tps_alert_count += delta;
801  if (DEBUG_ATLEAST(3)
802  /* and tps_alert_count becomes zero or non-zero */
803  && !old != !tps_alert_count) {
804  ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
805  tps->name, tps_alert_count ? "triggered" : "cleared");
806  }
807 
808  if (tps->subsystem[0] != '\0') {
809  if (delta > 0) {
811  } else {
813  }
814  }
815 
817 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
static void subsystem_alert_increment(const char *subsystem)
#define LOG_DEBUG
Definition: logger.h:241
#define ast_rwlock_unlock(a)
Definition: lock.h:232
#define ast_log
Definition: astobj2.c:42
static ast_rwlock_t tps_alert_lock
char * subsystem
Anything before the first &#39;/&#39; in the name (if there is one)
Definition: taskprocessor.c:93
static void subsystem_alert_decrement(const char *subsystem)
static unsigned int tps_alert_count
#define ast_rwlock_wrlock(a)
Definition: lock.h:234
#define DEBUG_ATLEAST(level)
Definition: logger.h:441

◆ tps_cmp_cb()

static int tps_cmp_cb ( void *  obj,
void *  arg,
int  flags 
)
static

The astobj2 compare callback for taskprocessors.

Definition at line 619 of file taskprocessor.c.

References CMP_MATCH, CMP_STOP, ast_taskprocessor::name, and OBJ_KEY.

Referenced by ast_tps_init().

620 {
621  struct ast_taskprocessor *lhs = obj, *rhs = arg;
622  const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
623 
624  return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
625 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
#define OBJ_KEY
Definition: astobj2.h:1155
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69

◆ tps_hash_cb()

static int tps_hash_cb ( const void *  obj,
const int  flags 
)
static

The astobj2 hash callback for taskprocessors.

Definition at line 610 of file taskprocessor.c.

References ast_str_case_hash(), name, ast_taskprocessor::name, and OBJ_KEY.

Referenced by ast_tps_init().

611 {
612  const struct ast_taskprocessor *tps = obj;
613  const char *name = flags & OBJ_KEY ? obj : tps->name;
614 
615  return ast_str_case_hash(name);
616 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
#define OBJ_KEY
Definition: astobj2.h:1155
static const char name[]
Definition: cdr_mysql.c:74
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1250

◆ tps_ping_handler()

static int tps_ping_handler ( void *  datap)
static

CLI taskprocessor ping <blah>handler function.

Definition at line 402 of file taskprocessor.c.

References ast_cond_signal, ast_mutex_lock, and ast_mutex_unlock.

Referenced by cli_tps_ping().

403 {
404  ast_mutex_lock(&cli_ping_cond_lock);
405  ast_cond_signal(&cli_ping_cond);
406  ast_mutex_unlock(&cli_ping_cond_lock);
407  return 0;
408 }
#define ast_mutex_lock(a)
Definition: lock.h:187
#define ast_cond_signal(cond)
Definition: lock.h:201
#define ast_mutex_unlock(a)
Definition: lock.h:188

◆ tps_report_taskprocessor_list()

static int tps_report_taskprocessor_list ( int  fd,
const char *  like 
)
static

Definition at line 539 of file taskprocessor.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_rbtree, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_debug, ast_taskprocessor_unreference(), ast_taskprocessor::name, NULL, tps_report_taskprocessor_list_helper(), and tps_sort_cb().

Referenced by cli_tps_report().

540 {
541  int tps_count = 0;
542  int word_len;
543  struct ao2_container *sorted_tps;
544  struct ast_taskprocessor *tps;
545  struct ao2_iterator iter;
546 
548  NULL);
549  if (!sorted_tps
550  || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
551  ast_debug(1, "Failed to retrieve sorted taskprocessors\n");
552  ao2_cleanup(sorted_tps);
553  return 0;
554  }
555 
556  word_len = strlen(like);
557  iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
558  while ((tps = ao2_iterator_next(&iter))) {
559  if (like) {
560  if (!strncasecmp(like, tps->name, word_len)) {
562  tps_count++;
563  }
564  } else {
566  tps_count++;
567  }
569  }
570  ao2_iterator_destroy(&iter);
571  ao2_ref(sorted_tps, -1);
572 
573  return tps_count;
574 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define NULL
Definition: resample.c:96
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ao2_ref(o, delta)
Definition: astobj2.h:464
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1358
Generic container type.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.

◆ tps_report_taskprocessor_list_helper()

static void tps_report_taskprocessor_list_helper ( int  fd,
struct ast_taskprocessor tps 
)
static

Definition at line 521 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ast_cli(), FMT_FIELDS, tps_taskprocessor_stats::max_qsize, ast_taskprocessor::name, ast_taskprocessor::stats, ast_taskprocessor::tps_queue_high, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by tps_report_taskprocessor_list().

522 {
524  tps->tps_queue_size, tps->stats.max_qsize, tps->tps_queue_low,
525  tps->tps_queue_high);
526 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:65
#define FMT_FIELDS
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:63
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
Definition: taskprocessor.c:71

◆ tps_reset_stats()

static void tps_reset_stats ( struct ast_taskprocessor tps)
static

Definition at line 1321 of file taskprocessor.c.

References tps_taskprocessor_stats::_tasks_processed_count, ao2_lock, ao2_unlock, tps_taskprocessor_stats::max_qsize, and ast_taskprocessor::stats.

Referenced by cli_tps_reset_stats(), and cli_tps_reset_stats_all().

1322 {
1323  ao2_lock(tps);
1324  tps->stats._tasks_processed_count = 0;
1325  tps->stats.max_qsize = 0;
1326  ao2_unlock(tps);
1327 }
#define ao2_unlock(a)
Definition: astobj2.h:730
unsigned long _tasks_processed_count
This is the current number of tasks processed.
Definition: taskprocessor.c:65
#define ao2_lock(a)
Definition: astobj2.h:718
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
Definition: taskprocessor.c:63
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
Definition: taskprocessor.c:71

◆ tps_shutdown()

static void tps_shutdown ( void  )
static

Definition at line 291 of file taskprocessor.c.

References ao2_t_ref, ARRAY_LEN, ast_cli_unregister_multiple(), ast_free, AST_VECTOR_CALLBACK_VOID, AST_VECTOR_RW_FREE, and NULL.

Referenced by ast_tps_init().

292 {
294  AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
295  AST_VECTOR_RW_FREE(&overloaded_subsystems);
296  ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
297  tps_singletons = NULL;
298 }
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:463
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
#define NULL
Definition: resample.c:96
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
#define ast_free(a)
Definition: astmm.h:182
static struct ast_cli_entry taskprocessor_clis[]
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:865

◆ tps_sort_cb()

static int tps_sort_cb ( const void *  obj_left,
const void *  obj_right,
int  flags 
)
static

Definition at line 488 of file taskprocessor.c.

References ast_taskprocessor::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by tps_report_taskprocessor_list().

489 {
490  const struct ast_taskprocessor *tps_left = obj_left;
491  const struct ast_taskprocessor *tps_right = obj_right;
492  const char *right_key = obj_right;
493  int cmp;
494 
495  switch (flags & OBJ_SEARCH_MASK) {
496  default:
497  case OBJ_SEARCH_OBJECT:
498  right_key = tps_right->name;
499  /* Fall through */
500  case OBJ_SEARCH_KEY:
501  cmp = strcasecmp(tps_left->name, right_key);
502  break;
504  cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
505  break;
506  }
507  return cmp;
508 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
Search option field mask.
Definition: astobj2.h:1076

◆ tps_task_alloc()

static struct tps_task* tps_task_alloc ( int(*)(void *datap)  task_exe,
void *  datap 
)
static

Definition at line 326 of file taskprocessor.c.

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute, LOG_ERROR, and NULL.

Referenced by ast_taskprocessor_push().

327 {
328  struct tps_task *t;
329  if (!task_exe) {
330  ast_log(LOG_ERROR, "task_exe is NULL!\n");
331  return NULL;
332  }
333 
334  t = ast_calloc(1, sizeof(*t));
335  if (!t) {
336  ast_log(LOG_ERROR, "failed to allocate task!\n");
337  return NULL;
338  }
339 
340  t->callback.execute = task_exe;
341  t->datap = datap;
342 
343  return t;
344 }
union tps_task::@428 callback
The execute() task callback function pointer.
#define NULL
Definition: resample.c:96
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:54
#define ast_log
Definition: astobj2.c:42
#define LOG_ERROR
Definition: logger.h:285
int(* execute)(void *datap)
Definition: taskprocessor.c:50
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47

◆ tps_task_alloc_local()

static struct tps_task* tps_task_alloc_local ( int(*)(struct ast_taskprocessor_local *local)  task_exe,
void *  datap 
)
static

Definition at line 346 of file taskprocessor.c.

References ast_calloc, ast_log, tps_task::callback, tps_task::datap, tps_task::execute_local, LOG_ERROR, NULL, and tps_task::wants_local.

Referenced by ast_taskprocessor_push_local().

347 {
348  struct tps_task *t;
349  if (!task_exe) {
350  ast_log(LOG_ERROR, "task_exe is NULL!\n");
351  return NULL;
352  }
353 
354  t = ast_calloc(1, sizeof(*t));
355  if (!t) {
356  ast_log(LOG_ERROR, "failed to allocate task!\n");
357  return NULL;
358  }
359 
360  t->callback.execute_local = task_exe;
361  t->datap = datap;
362  t->wants_local = 1;
363 
364  return t;
365 }
union tps_task::@428 callback
The execute() task callback function pointer.
#define NULL
Definition: resample.c:96
void * datap
The data pointer for the task execute() function.
Definition: taskprocessor.c:54
#define ast_log
Definition: astobj2.c:42
int(* execute_local)(struct ast_taskprocessor_local *local)
Definition: taskprocessor.c:51
#define LOG_ERROR
Definition: logger.h:285
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
unsigned int wants_local
Definition: taskprocessor.c:57

◆ tps_task_free()

static void* tps_task_free ( struct tps_task task)
static

Definition at line 368 of file taskprocessor.c.

References ast_free, and NULL.

Referenced by ast_taskprocessor_execute(), and tps_taskprocessor_dtor().

369 {
370  ast_free(task);
371  return NULL;
372 }
#define NULL
Definition: resample.c:96
#define ast_free(a)
Definition: astmm.h:182

◆ tps_taskprocessor_dtor()

static void tps_taskprocessor_dtor ( void *  tps)
static

Definition at line 866 of file taskprocessor.c.

References ao2_cleanup, AST_LIST_REMOVE_HEAD, ast_taskprocessor::high_water_alert, tps_task::list, ast_taskprocessor::listener, NULL, task(), tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_size, and tps_task_free().

Referenced by __allocate_taskprocessor().

867 {
868  struct ast_taskprocessor *t = tps;
869  struct tps_task *task;
870 
871  while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
872  tps_task_free(task);
873  }
874  t->tps_queue_size = 0;
875 
876  if (t->high_water_alert) {
877  t->high_water_alert = 0;
878  tps_alert_add(t, -1);
879  }
880 
881  ao2_cleanup(t->listener);
882  t->listener = NULL;
883 }
unsigned int high_water_alert
Definition: taskprocessor.c:89
struct ast_taskprocessor_listener * listener
Definition: taskprocessor.c:81
#define NULL
Definition: resample.c:96
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
static int task(void *data)
Queued task for baseline test.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:832
struct ast_taskprocessor::tps_queue tps_queue
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
struct tps_task::@429 list
AST_LIST_ENTRY overhead.
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static void * tps_task_free(struct tps_task *task)

◆ tps_taskprocessor_pop()

static struct tps_task* tps_taskprocessor_pop ( struct ast_taskprocessor tps)
static

Definition at line 886 of file taskprocessor.c.

References AST_LIST_REMOVE_HEAD, ast_taskprocessor::high_water_alert, tps_task::list, task(), tps_alert_add(), ast_taskprocessor::tps_queue, ast_taskprocessor::tps_queue_low, and ast_taskprocessor::tps_queue_size.

Referenced by ast_taskprocessor_execute().

887 {
888  struct tps_task *task;
889 
890  if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
891  --tps->tps_queue_size;
892  if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
893  tps->high_water_alert = 0;
894  tps_alert_add(tps, -1);
895  }
896  }
897  return task;
898 }
unsigned int high_water_alert
Definition: taskprocessor.c:89
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
static int task(void *data)
Queued task for baseline test.
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:832
struct ast_taskprocessor::tps_queue tps_queue
tps_task structure is queued to a taskprocessor
Definition: taskprocessor.c:47
struct tps_task::@429 list
AST_LIST_ENTRY overhead.
long tps_queue_size
Taskprocessor current queue size.
Definition: taskprocessor.c:74

◆ tps_taskprocessor_tab_complete()

static char* tps_taskprocessor_tab_complete ( struct ast_cli_args a)
static

Definition at line 379 of file taskprocessor.c.

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_cli_completion_add(), ast_strdup, ast_taskprocessor_unreference(), ast_taskprocessor::name, NULL, and ast_cli_args::word.

Referenced by cli_tps_ping(), cli_tps_report(), and cli_tps_reset_stats().

380 {
381  int tklen;
382  struct ast_taskprocessor *p;
383  struct ao2_iterator i;
384 
385  tklen = strlen(a->word);
386  i = ao2_iterator_init(tps_singletons, 0);
387  while ((p = ao2_iterator_next(&i))) {
388  if (!strncasecmp(a->word, p->name, tklen)) {
391  break;
392  }
393  }
395  }
397 
398  return NULL;
399 }
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name&#39;s NULL terminator.
Definition: taskprocessor.c:97
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
#define NULL
Definition: resample.c:96
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
const char * word
Definition: cli.h:163
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2726
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.

Variable Documentation

◆ default_listener_callbacks

const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
static

Definition at line 280 of file taskprocessor.c.

◆ taskprocessor_clis

struct ast_cli_entry taskprocessor_clis[]
static

Definition at line 159 of file taskprocessor.c.

◆ tps_alert_count

unsigned int tps_alert_count
static

Count of the number of taskprocessors in high water alert.

Definition at line 779 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().

◆ tps_alert_lock

ast_rwlock_t tps_alert_lock = { PTHREAD_RWLOCK_INITIALIZER , NULL, {1, 0} }
static

Access protection for tps_alert_count

Definition at line 782 of file taskprocessor.c.

Referenced by ast_taskprocessor_alert_get(), and tps_alert_add().