130 #define TPS_MAX_BUCKETS 61 133 #define TPS_MAX_BUCKETS 1567 146 static int tps_hash_cb(
const void *obj,
const int flags);
148 static int tps_cmp_cb(
void *obj,
void *arg,
int flags);
203 if (res != 0 &&
errno != EINTR) {
266 if (pthread_equal(pthread_self(), pvt->
poll_thread)) {
296 ao2_t_ref(tps_singletons, -1,
"Unref tps_singletons in shutdown");
297 tps_singletons =
NULL;
305 if (!tps_singletons) {
312 ast_log(
LOG_ERROR,
"taskprocessor subsystems vector failed to initialize!\n");
385 tklen = strlen(a->
word);
388 if (!strncasecmp(a->
word, p->
name, tklen)) {
413 struct timeval begin, end, delta;
421 e->
command =
"core ping taskprocessor";
423 "Usage: core ping taskprocessor <taskprocessor>\n" 424 " Displays the time required for a task to be processed\n";
439 ast_cli(a->
fd,
"\nping failed: %s not found\n\n", name);
442 ast_cli(a->
fd,
"\npinging %s ...", name);
452 ts.tv_sec = when.tv_sec;
453 ts.tv_nsec = when.tv_usec * 1000;
458 ast_cli(a->
fd,
"\nping failed: could not push task to %s\n\n", name);
467 ast_cli(a->
fd,
"\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (
long)delta.tv_sec, (
long int)delta.tv_usec);
488 static int tps_sort_cb(
const void *obj_left,
const void *obj_right,
int flags)
492 const char *right_key = obj_right;
498 right_key = tps_right->
name;
501 cmp = strcasecmp(tps_left->
name, right_key);
504 cmp = strncasecmp(tps_left->
name, right_key, strlen(right_key));
510 #define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s\n" 511 #define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu\n" 551 ast_debug(1,
"Failed to retrieve sorted taskprocessors\n");
556 word_len = strlen(like);
560 if (!strncasecmp(like, tps->
name, word_len)) {
582 e->
command =
"core show taskprocessors [like]";
584 "Usage: core show taskprocessors [like keyword]\n" 585 " Shows a list of instantiated task processors and their statistics\n";
597 }
else if (a->
argc == e->
args + 1 && !strcasecmp(a->
argv[e->
args-1],
"like")) {
603 ast_cli(a->
fd,
"\n" FMT_HEADERS,
"Processor",
"Processed",
"In Queue",
"Max Depth",
"Low water",
"High water");
622 const char *rhsname = flags &
OBJ_KEY ? arg : rhs->name;
629 return !strcmp(alert->
subsystem, subsystem);
640 unsigned int count = 0;
672 alert =
ast_malloc(
sizeof(*alert) + strlen(subsystem) + 1);
699 "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
715 struct subsystem_alert_vector *vector)
731 struct subsystem_alert_vector sorted_subsystems;
734 #define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n" 735 #define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n" 739 e->
command =
"core show taskprocessor alerted subsystems";
741 "Usage: core show taskprocessor alerted subsystems\n" 742 " Shows a list of task processor subsystems that are currently alerted\n";
769 ast_cli(a->
fd,
"\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
800 tps_alert_count += delta;
803 && !old != !tps_alert_count) {
805 tps->
name, tps_alert_count ?
"triggered" :
"cleared");
832 if (!tps || high_water < 0 || high_water < low_water) {
838 low_water = (high_water * 9) / 10;
853 if (high_water < tps->tps_queue_size) {
947 return listener->
tps;
986 char *subsystem_separator;
987 size_t subsystem_length = 0;
990 name_length = strlen(name);
991 subsystem_separator = strchr(name,
'/');
992 if (subsystem_separator) {
993 subsystem_length = subsystem_separator -
name;
1006 strcpy(p->
name, name);
1031 ast_log(
LOG_ERROR,
"Unable to start taskprocessor listener for taskprocessor %s\n",
1160 ast_log(
LOG_WARNING,
"The '%s' task processor queue reached %ld scheduled tasks%s.\n",
1169 was_empty = tps->
executing ? 0 : previous_size == 0;
1225 tps->
thread = pthread_self();
1271 is_task = pthread_equal(tps->
thread, pthread_self());
1283 #define SEQ_STR_SIZE (1 + 8 + 1) 1303 va_start(ap, format);
1304 user_size = vsnprintf(buf, size - (
SEQ_STR_SIZE - 1), format, ap);
1306 if (user_size < 0) {
1336 e->
command =
"core reset taskprocessor";
1338 "Usage: core reset taskprocessor <taskprocessor>\n" 1339 " Resets stats for the specified taskprocessor\n";
1351 ast_cli(a->
fd,
"\nReset failed: %s not found\n\n", name);
1354 ast_cli(a->
fd,
"\nResetting %s\n\n", name);
1370 e->
command =
"core reset taskprocessors";
1372 "Usage: core reset taskprocessors\n" 1373 " Resets stats for all taskprocessors\n";
1383 ast_cli(a->
fd,
"\nResetting stats for all taskprocessors\n\n");
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
A listener for taskprocessors.
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
#define AST_VECTOR_GET_INDEX(vec, value, cmp)
Get the 1st index from a vector that matches the given comparison.
#define ast_rwlock_rdlock(a)
#define AST_CLI_DEFINE(fn, txt,...)
const struct ast_taskprocessor_listener_callbacks * callbacks
#define AST_VECTOR_ADD_SORTED(vec, elem, cmp)
Add an element into a sorted vector.
int ast_sem_destroy(struct ast_sem *sem)
Destroy a semaphore.
Asterisk main include file. File version handling, generic pbx functions.
void(* shutdown)(struct ast_taskprocessor_listener *listener)
Indicates the taskprocessor wishes to die.
char name[0]
Friendly name of the taskprocessor. Subsystem is appended after the name's NULL terminator.
unsigned int ast_taskprocessor_seq_num(void)
Get the next sequence number to create a human friendly taskprocessor name.
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
static int tps_report_taskprocessor_list(int fd, const char *like)
static void tps_shutdown(void)
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap)
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static char * cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
static void subsystem_alert_increment(const char *subsystem)
The arg parameter is a search key, but is not an object.
void(* emptied)(struct ast_taskprocessor_listener *listener)
Indicates the task processor has become empty.
union tps_task::@428 callback
The execute() task callback function pointer.
Time-related functions and macros.
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
static char * cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static int tps_cmp_cb(void *obj, void *arg, int flags)
The astobj2 compare callback for taskprocessors.
int ast_sem_post(struct ast_sem *sem)
Increments the semaphore, unblocking a waiter if necessary.
descriptor for a cli entry.
static struct ast_taskprocessor * __start_taskprocessor(struct ast_taskprocessor *p)
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
static void * default_tps_processing_function(void *data)
Function that processes tasks in the taskprocessor.
struct ast_taskprocessor * tps
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
static int tps_hash_cb(const void *obj, const int flags)
The astobj2 hash callback for taskprocessors.
unsigned int high_water_alert
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
Assume that the ao2_container is already locked.
static void subsystem_copy(struct subsystem_alert *alert, struct subsystem_alert_vector *vector)
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
#define ast_cond_init(cond, attr)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct ast_taskprocessor_listener * listener
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define ao2_link_flags(container, obj, flags)
#define ast_mutex_lock(a)
#define ast_strdup(str)
A wrapper for strdup()
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *)
CLI taskprocessor ping <blah>operation requires a ping condition lock.
unsigned int high_water_warned
long tps_queue_high
Taskprocessor high water alert trigger level.
void ast_cli(int fd, const char *fmt,...)
#define ast_rwlock_unlock(a)
#define FMT_HEADERS_SUBSYSTEM
static struct tps_task * tps_taskprocessor_pop(struct ast_taskprocessor *tps)
#define ast_cond_signal(cond)
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
static int task(void *data)
Queued task for baseline test.
int args
This gets set in ast_cli_register()
pthread_cond_t ast_cond_t
#define ast_strlen_zero(foo)
void(* dtor)(struct ast_taskprocessor_listener *listener)
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.
void * datap
The data pointer for the task execute() function.
#define ast_debug(level,...)
Log a DEBUG message.
long tps_queue_low
Taskprocessor low water clear alert level.
unsigned long _tasks_processed_count
This is the current number of tasks processed.
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
static ast_rwlock_t tps_alert_lock
static void * listener(void *unused)
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
int(* execute_local)(struct ast_taskprocessor_local *local)
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
char * subsystem
Anything before the first '/' in the name (if there is one)
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
Am I the given taskprocessor's current task.
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
#define AST_PTHREADT_NULL
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
Gets the current value of the semaphore.
#define ao2_ref(o, delta)
static int default_listener_die(void *data)
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
#define ast_malloc(len)
A wrapper for malloc()
static int default_listener_start(struct ast_taskprocessor_listener *listener)
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
AST_LIST_HEAD_NOLOCK(contactliststruct, contact)
static void tps_reset_stats(struct ast_taskprocessor *tps)
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
static struct ast_taskprocessor * __allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
int ast_sem_wait(struct ast_sem *sem)
Decrements the semaphore.
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
static void subsystem_alert_decrement(const char *subsystem)
ast_tps_options
ast_tps_options for specification of taskprocessor options
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
#define ao2_iterator_next(iter)
#define ao2_alloc(data_size, destructor_fn)
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static unsigned int tps_alert_count
#define AST_LIST_ENTRY(type)
Declare a forward link structure inside a list entry.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
int(* execute)(void *datap)
#define ast_calloc(num, len)
A wrapper for calloc()
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by sybsystem.
#define ast_pthread_create(a, b, c, d)
static struct tps_task * tps_task_alloc_local(int(*task_exe)(struct ast_taskprocessor_local *local), void *datap)
static char * cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
Initialize a semaphore.
Prototypes for public functions only of internal interest,.
#define ao2_find(container, arg, flags)
#define FMT_FIELDS_SUBSYSTEM
An API for managing task processing threads that can be shared across modules.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
struct ast_taskprocessor::tps_queue tps_queue
tps_taskprocessor_stats maintain statistics for a taskprocessor.
tps_task structure is queued to a taskprocessor
struct tps_task::@429 list
AST_LIST_ENTRY overhead.
#define ast_rwlock_wrlock(a)
return a reference to a taskprocessor ONLY if it already exists
static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocessor *tps)
The arg parameter is an object of the same type.
long tps_queue_size
Taskprocessor current queue size.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
Get the task processor suspend status.
A ast_taskprocessor structure is a singleton by name.
static struct ast_cli_entry taskprocessor_clis[]
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
static struct tps_task * tps_task_alloc(int(*task_exe)(void *datap), void *datap)
Standard Command Line Interface.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static int tps_ping_handler(void *datap)
CLI taskprocessor ping <blah>handler function.
static char * cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ao2_unlink_flags(container, obj, flags)
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
static void tps_taskprocessor_dtor(void *tps)
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
static void * default_listener_pvt_alloc(void)
static char * tps_taskprocessor_tab_complete(struct ast_cli_args *a)
Search option field mask.
static void taskprocessor_listener_dtor(void *obj)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
#define DEBUG_ATLEAST(level)
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
#define AST_VECTOR_REMOVE(vec, idx, preserve_ordered)
Remove an element from a vector by index.
Asterisk module definitions.
static snd_pcm_format_t format
unsigned long max_qsize
This is the maximum number of tasks queued at any one time.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define ast_cond_timedwait(cond, mutex, time)
static void * tps_task_free(struct tps_task *task)
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_MUTEX_DEFINE_STATIC(mutex)
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
static char * cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ast_mutex_unlock(a)
struct tps_taskprocessor_stats stats
Taskprocessor statistics.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.