302 #define INITIAL_SUBSCRIBERS_MAX 4 305 #define TOPIC_POOL_BUCKETS 57 312 #if defined(LOW_MEMORY) 314 #define TOPIC_ALL_BUCKETS 257 318 #define TOPIC_ALL_BUCKETS 997 325 #define TOPIC_STATISTICS_BUCKETS 57 328 #define SUBSCRIPTION_STATISTICS_BUCKETS 57 353 struct stasis_topic_statistics {
355 long highest_time_dispatched;
357 long lowest_time_dispatched;
359 int messages_not_dispatched;
361 int messages_dispatched;
393 struct timeval *creationtime;
404 struct timeval creationtime;
426 #define topic_lock_both(topic1, topic2) \ 429 while (ao2_trylock(topic2)) { \ 430 AO2_DEADLOCK_AVOIDANCE(topic1); \ 441 ast_debug(2,
"Destroying topic. name: %s, detail: %s\n",
442 topic->name, topic->detail);
450 ast_debug(1,
"Topic '%s': %p destroyed\n", topic->name, topic);
453 if (topic->statistics) {
459 ao2_ref(topic->statistics, -1);
467 struct stasis_topic_statistics *
statistics = obj;
487 if (!statistics->subscribers) {
493 statistics->topic = topic;
494 strcpy(statistics->name, topic->name);
507 if (!topic || !name || !strlen(name) || !detail) {
522 detail_len = strlen(detail) + 1;
525 sizeof(*proxy) + strlen(name) + 1 + detail_len,
NULL, name);
534 proxy->
detail = proxy->
name + strlen(name) + 1;
536 strcpy(proxy->
name, name);
557 topic->name = proxy->
name;
558 topic->detail = proxy->
detail;
570 const char *
name,
const char* detail
576 if (!name|| !strlen(name) || !detail) {
579 ast_debug(2,
"Creating topic. name: %s, detail: %s\n", name, detail);
583 ast_debug(2,
"Topic is already exist. name: %s, detail: %s\n",
608 if (!topic->statistics) {
613 ast_debug(1,
"Topic '%s': %p created\n", topic->name, topic);
641 return topic->detail;
739 if (sub->statistics) {
741 if (subscription_stats) {
742 ao2_unlink(subscription_stats, sub->statistics);
743 ao2_ref(subscription_stats, -1);
762 struct timeval start;
796 if (elapsed > sub->statistics->highest_time_invoked) {
797 sub->statistics->highest_time_invoked = elapsed;
802 if (elapsed < sub->
statistics->lowest_time_invoked) {
803 sub->statistics->lowest_time_invoked = elapsed;
824 int needs_mailbox,
int use_thread_pool,
const char *
file,
int lineno,
830 if (!subscription_stats) {
840 if (!statistics->
topics) {
852 ao2_link(subscription_stats, statistics);
901 use_thread_pool ?
'p' :
'm',
909 if (use_thread_pool) {
989 "Internal error: subscription has invalid topic\n");
1014 long low_water,
long high_water)
1020 low_water, high_water);
1028 if (!subscription) {
1058 if (!subscription) {
1082 if (!subscription) {
1088 subscription->filter =
filter;
1101 subscription->accepted_formatters = formatters;
1139 if (!subscription) {
1216 for (idx = 0; idx <
AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1237 for (idx = 0; idx <
AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1329 int type_filter_specified = 0;
1330 int formatter_filter_specified = 0;
1331 int type_filter_passed = 0;
1332 int formatter_filter_passed = 0;
1343 if (!type_filter_specified && !formatter_filter_specified) {
1347 type_filter_passed = type_filter_specified
1355 if (type_filter_passed) {
1359 formatter_filter_passed = formatter_filter_specified
1362 if (formatter_filter_passed) {
1436 unsigned int dispatched = 0;
1440 struct timeval start;
1494 if (elapsed > topic->statistics->highest_time_dispatched) {
1495 topic->statistics->highest_time_dispatched = elapsed;
1497 if (elapsed < topic->statistics->lowest_time_dispatched) {
1498 topic->statistics->lowest_time_dispatched = elapsed;
1585 if (!from_topic || !to_topic) {
1595 if (to_topic == from_topic) {
1629 size_t description_len = strlen(description) + 1;
1630 size_t uniqueid_len = strlen(uniqueid) + 1;
1728 topic_pool_entry =
ao2_alloc_options(
sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1730 if (!topic_pool_entry) {
1734 strcpy(topic_pool_entry->
name, topic_name);
1736 return topic_pool_entry;
1750 char *container_name =
1788 const char *right_key = arg;
1793 right_key = object_right->
name;
1796 cmp = strcasecmp(object_left->
name, right_key);
1822 static void topic_pool_prnt_obj(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
1851 char *container_name =
1872 int pool_topic_name_len = strlen(pool_topic_name);
1873 const char *search_topic_name;
1875 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1876 search_topic_name = topic_name + pool_topic_name_len + 1;
1878 search_topic_name = topic_name;
1888 char *new_topic_name;
1932 if (!topic_pool_entry) {
1936 ao2_ref(topic_pool_entry, -1);
2027 if (!channel_snapshot) {
2116 ami_snapshot =
NULL;
2149 if (!strcmp(
"eventname", key)) {
2161 const char *eventname;
2166 if (!object_string || !body) {
2204 .name =
"threadpool",
2206 .category =
"threadpool",
2215 .name =
"declined_message_types",
2216 .item_offset = offsetof(
struct stasis_config, declined_message_types),
2218 .category =
"declined_message_types",
2225 .types =
ACO_TYPES(&declined_option, &threadpool_option),
2286 char *name_in_declined;
2295 res = name_in_declined ? 1 : 0;
2299 ast_log(
LOG_NOTICE,
"Declining to allocate Stasis message type '%s' due to configuration\n", name);
2340 #define FMT_HEADERS "%-64s %-64s\n" 2341 #define FMT_FIELDS "%-64s %-64s\n" 2345 e->
command =
"stasis show topics";
2347 "Usage: stasis show topics\n" 2348 " Shows a list of topics\n";
2361 topic_proxy_sort_fn,
NULL);
2379 ast_cli(a->
fd,
"\n%d Total topics\n\n", count);
2395 int wordlen = strlen(word);
2400 if (!strncasecmp(word, topic->
name, wordlen)) {
2420 char print_time[32];
2425 e->
command =
"stasis show topic";
2427 "Usage: stasis show topic <name>\n" 2428 " Show stasis topic detail info.\n";
2444 ast_cli(a->
fd,
"Specified topic '%s' does not exist\n", a->
argv[3]);
2448 ast_cli(a->
fd,
"Name: %s\n", topic->name);
2449 ast_cli(a->
fd,
"Detail: %s\n", topic->detail);
2453 ast_cli(a->
fd,
"Duration time: %s\n", print_time);
2459 ast_cli(a->
fd,
" UniqueID: %s, Topic: %s, Detail: %s\n",
2460 subscription_tmp->
uniqueid, subscription_tmp->
topic->name, subscription_tmp->
topic->detail);
2463 ast_cli(a->
fd,
"\nForwarded topics:\n");
2466 ast_cli(a->
fd,
" Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2499 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n" 2500 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n" 2501 #define FMT_FIELDS2 "%-64s %10d %10d\n" 2505 e->
command =
"stasis statistics show subscriptions";
2507 "Usage: stasis statistics show subscriptions\n" 2508 " Shows a list of subscriptions and their general statistics\n";
2519 if (!subscription_stats) {
2520 ast_cli(a->
fd,
"Could not fetch subscription_statistics container\n");
2525 stasis_subscription_statistics_sort_fn,
NULL);
2526 if (!sorted_subscriptions) {
2527 ao2_ref(subscription_stats, -1);
2528 ast_cli(a->
fd,
"Could not create container for sorting subscription statistics\n");
2533 ao2_ref(sorted_subscriptions, -1);
2534 ao2_ref(subscription_stats, -1);
2535 ast_cli(a->
fd,
"Could not sort subscription statistics\n");
2539 ao2_ref(subscription_stats, -1);
2541 ast_cli(a->
fd,
"\n" FMT_HEADERS,
"Subscription",
"Dropped",
"Passed",
"Lowest Invoke",
"Highest Invoke");
2554 ao2_ref(sorted_subscriptions, -1);
2557 ast_cli(a->
fd,
"\n%d subscriptions\n\n", count);
2575 int wordlen = strlen(word);
2580 if (!subscription_stats) {
2586 if (!strncasecmp(word, statistics->
uniqueid, wordlen)
2587 && ++which > state) {
2596 ao2_ref(subscription_stats, -1);
2613 e->
command =
"stasis statistics show subscription";
2615 "Usage: stasis statistics show subscription <uniqueid>\n" 2616 " Show stasis subscription statistics.\n";
2631 if (!subscription_stats) {
2632 ast_cli(a->
fd,
"Could not fetch subcription_statistics container\n");
2638 ao2_ref(subscription_stats, -1);
2639 ast_cli(a->
fd,
"Specified subscription '%s' does not exist\n", a->
argv[4]);
2643 ao2_ref(subscription_stats, -1);
2646 ast_cli(a->
fd,
"Pointer Address: %p\n", statistics->
sub);
2647 ast_cli(a->
fd,
"Source filename: %s\n",
S_OR(statistics->
file,
"<unavailable>"));
2649 ast_cli(a->
fd,
"Source function: %s\n",
S_OR(statistics->
func,
"<unavailable>"));
2691 int not_dispatched = 0;
2693 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n" 2694 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n" 2695 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n" 2699 e->
command =
"stasis statistics show topics";
2701 "Usage: stasis statistics show topics\n" 2702 " Shows a list of topics and their general statistics\n";
2714 ast_cli(a->
fd,
"Could not fetch topic_statistics container\n");
2719 stasis_topic_statistics_sort_fn,
NULL);
2720 if (!sorted_topics) {
2722 ast_cli(a->
fd,
"Could not create container for sorting topic statistics\n");
2729 ast_cli(a->
fd,
"Could not sort topic statistics\n");
2735 ast_cli(a->
fd,
"\n" FMT_HEADERS,
"Topic",
"Subscribers",
"Dropped",
"Dispatched",
"Lowest Dispatch",
"Highest Dispatch");
2740 statistics->messages_not_dispatched, statistics->messages_dispatched,
2741 statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2742 not_dispatched += statistics->messages_not_dispatched;
2743 dispatched += statistics->messages_dispatched;
2752 ast_cli(a->
fd,
"\n%d topics\n\n", count);
2770 int wordlen = strlen(word);
2781 if (!strncasecmp(word, statistics->name, wordlen)
2782 && ++which > state) {
2808 e->
command =
"stasis statistics show topic";
2810 "Usage: stasis statistics show topic <name>\n" 2811 " Show stasis topic statistics.\n";
2827 ast_cli(a->
fd,
"Could not fetch topic_statistics container\n");
2834 ast_cli(a->
fd,
"Specified topic '%s' does not exist\n", a->
argv[4]);
2840 ast_cli(a->
fd,
"Topic: %s\n", statistics->name);
2841 ast_cli(a->
fd,
"Pointer Address: %p\n", statistics->topic);
2842 ast_cli(a->
fd,
"Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2843 ast_cli(a->
fd,
"Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2844 ast_cli(a->
fd,
"Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2845 ast_cli(a->
fd,
"Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2871 #define FMT_HEADERS "%-64s %10s %10s\n" 2872 #define FMT_FIELDS "%-64s %10d %10d\n" 2876 e->
command =
"stasis statistics show messages";
2878 "Usage: stasis statistics show messages\n" 2879 " Shows a list of message types and their general statistics\n";
2902 unused += statistics->
unused;
2908 ast_cli(a->
fd,
"\n%d seen message types\n\n", count);
2935 key =
object->uniqueid;
2949 const char *right_key = arg;
2954 right_key = object_right->
uniqueid;
2957 cmp = strcasecmp(object_left->
uniqueid, right_key);
2984 const struct stasis_topic_statistics *object;
3005 const struct stasis_topic_statistics *object_left = obj;
3006 const struct stasis_topic_statistics *object_right = arg;
3007 const char *right_key = arg;
3012 right_key = object_right->name;
3015 cmp = strcasecmp(object_left->name, right_key);
3101 ast_log(
LOG_ERROR,
"Failed to initialize defaults on Stasis configuration object\n");
3108 ast_log(
LOG_ERROR,
"Failed to load stasis.conf and failed to initialize defaults.\n");
3140 if (cache_init != 0) {
3152 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3167 if (!subscription_stats) {
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
static struct aco_type threadpool_option
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Struct containing info for an AMI event to send out.
static char * stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
int auto_increment
Number of threads to increment pool by.
static void statistics(void)
static char * subscription_statistics_complete_name(const char *word, int state)
Main Channel structure associated with a channel.
#define AST_CLI_DEFINE(fn, txt,...)
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
static char * topic_statistics_complete_name(const char *word, int state)
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
int idle_timeout
Time limit in seconds for idle threads.
struct stasis_topic * topic
static int message_type_id
#define STASIS_UMOS_MAX
Number of snapshot types.
#define TOPIC_POOL_BUCKETS
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
static void topic_dtor(void *obj)
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
static void topic_pool_dtor(void *obj)
The arg parameter is a search key, but is not an object.
const char * func
The function where the subscription originates.
int initial_size
Number of threads the pool will start with.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type)
int max_size
Maximum number of threads a pool may have.
static char * statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
#define AST_THREADPOOL_OPTIONS_VERSION
static struct ast_cli_entry cli_stasis_statistics[]
descriptor for a cli entry.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
void stasis_log_bad_type_access(const char *name)
Threadpool configuration options.
static AO2_GLOBAL_OBJ_STATIC(topic_statistics)
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Structure for variables, used for configurations and for channel variables.
static pj_pool_t * pool
Global memory pool for configuration and timers.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Structure representing a snapshot of channel state.
Universally unique identifier support.
return a reference to a taskprocessor, create one if it does not exist
static char * statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
static int sub_cleanup(void *data)
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid)
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Assume that the ao2_container is already locked.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
static void topic_pool_entry_dtor(void *obj)
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name)
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
#define ast_cond_wait(cond, mutex)
struct aco_type * declined_options[]
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
#define ast_cond_init(cond, attr)
#define ast_cli_register_multiple(e, len)
Register multiple commands.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
static void stasis_declined_config_destructor(void *obj)
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
long highest_time_invoked
Highest time spent invoking a message.
static void forward_dtor(void *obj)
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ao2_link_flags(container, obj, flags)
#define ast_mutex_lock(a)
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
#define ast_strdup(str)
A wrapper for strdup()
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
static struct ast_manager_event_blob * multi_user_event_to_ami(struct stasis_message *message)
The representation of a single configuration file to be processed.
void ast_cli(int fd, const char *fmt,...)
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
AO2_STRING_FIELD_CMP_FN(topic_proxy, name)
struct aco_file stasis_conf
static void subscription_change_dtor(void *obj)
#define ast_cond_signal(cond)
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
struct ao2_container * pool_container
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
#define topic_lock_both(topic1, topic2)
Lock two topics.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Structure containing callbacks for Stasis message sanitization.
int lineno
The line number where the subscription originates.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate...
struct stasis_topic * from_topic
int unused
The number of messages of this that did not reach a subscriber.
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
int final_message_processed
int args
This gets set in ast_cli_register()
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
pthread_cond_t ast_cond_t
#define ast_strlen_zero(foo)
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
static struct ast_json * multi_user_event_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
static int topic_pool_entry_hash(const void *obj, const int flags)
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
struct ao2_container * declined
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
static char * statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_debug(level,...)
Log a DEBUG message.
static ast_mutex_t message_type_statistics_lock
struct stasis_declined_config * declined_message_types
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
A multi object blob data structure to carry user event stasis messages.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
static char * topic_complete_name(const char *word)
static int subscription_statistics_hash(const void *obj, const int flags)
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
static void subscription_statistics_destroy(void *obj)
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
const char * file
The filename where the subscription originates.
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
#define ao2_ref(o, delta)
static char * statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
static int topic_statistics_hash(const void *obj, const int flags)
int messages_passed
The number of messages that passed filtering.
static char * stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
struct stasis_topic * to_topic
struct ao2_container * container
static struct console_pvt globals
struct stasis_topic * topic
static int userevent_exclusion_cb(const char *key)
struct ao2_container * topics
Names of the topics we are subscribed to.
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Their was an error and no changes were applied.
struct stasis_topic * topic
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
#define SUBSCRIPTION_STATISTICS_BUCKETS
struct timeval creationtime
Configuration option-handling.
static struct ast_cli_entry cli_stasis[]
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
struct ast_taskprocessor * mailbox
static void * stasis_config_alloc(void)
#define ao2_unlink(container, obj)
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
AO2_STRING_FIELD_HASH_FN(topic_proxy, name)
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
#define ao2_global_obj_release(holder)
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static int subscription_statistics_cmp(void *obj, void *arg, int flags)
struct ao2_container * topic_all
#define ao2_iterator_next(iter)
#define ast_cond_destroy(cond)
int uses_mailbox
Using a mailbox to queue messages.
#define ao2_alloc(data_size, destructor_fn)
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
int uses_threadpool
Using stasis threadpool for handling messages.
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
static struct ast_str * multi_object_blob_to_ami(void *obj)
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
#define ast_calloc(num, len)
A wrapper for calloc()
static void multi_object_blob_dtor(void *obj)
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Holds details about changes to subscriptions for the specified topic.
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Vector container support.
static struct stasis_topic_statistics * stasis_topic_statistics_create(struct stasis_topic *topic)
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
#define ao2_find(container, arg, flags)
An API for managing task processing threads that can be shared across modules.
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
static void proxy_dtor(void *weakproxy, void *container)
int stasis_cache_init(void)
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
stasis_subscription_cb callback
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
#define TOPIC_STATISTICS_BUCKETS
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
#define ao2_global_obj_replace_unref(holder, obj)
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
The arg parameter is an object of the same type.
struct stasis_message_type * highest_time_message_type
The message type that currently took the longest to process.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
A ast_taskprocessor structure is a singleton by name.
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct stasis_topic * pool_topic
int published
The number of messages of this published.
stasis_subscription_message_filter
Stasis subscription message filters.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Standard Command Line Interface.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
struct stasis_forward * forward
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Type information about a category-level configurable object.
An opaque threadpool structure.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
#define INITIAL_SUBSCRIBERS_MAX
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
static struct stasis_subscription_statistics * stasis_subscription_statistics_create(struct stasis_subscription *sub, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
A structure to hold global configuration-related options.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
char uniqueid[0]
Unique ID of the subscription.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
#define AO2_WEAKPROXY()
Macro which must be used at the beginning of weakproxy capable objects.
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
struct stasis_forward * sub
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
static void topic_statistics_destroy(void *obj)
struct stasis_threadpool_conf * threadpool_options
Abstract JSON element (object, array, string, int, ...).
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
struct stasis_subscription * sub
Pointer to the subscription (NOT refcounted, and must NOT be accessed)
#define ast_mutex_init(pmutex)
static AST_VECTOR(struct stasis_message_type_statistics)
Search option field mask.
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
static struct aco_type * threadpool_options[]
#define ast_mutex_destroy(a)
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
long lowest_time_invoked
Lowest time spent invoking a message.
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
static void subscription_dtor(void *obj)
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
static int topic_statistics_cmp(void *obj, void *arg, int flags)
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
stasis_subscription_message_formatters
Stasis subscription formatter filters.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
static void stasis_config_destructor(void *obj)
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic...
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
#define AST_MUTEX_DEFINE_STATIC(mutex)
Structure for mutex and tracking information.
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
int messages_dropped
The number of messages that were filtered out.
static char * statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
#define TOPIC_ALL_BUCKETS
#define ast_mutex_unlock(a)
struct stasis_message_type * message_type
The stasis message type.
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
#define ao2_link(container, obj)
int stasis_init(void)
Initialize the Stasis subsystem.