Asterisk - The Open Source Telephony Project
18.5.0
|
Stasis Message Bus API. More...
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
#include "asterisk/cli.h"
Go to the source code of this file.
Data Structures | |
struct | ast_multi_object_blob |
A multi object blob data structure to carry user event stasis messages. More... | |
struct | stasis_config |
struct | stasis_declined_config |
A structure to hold global configuration-related options. More... | |
struct | stasis_forward |
Forwarding information. More... | |
struct | stasis_message_type_statistics |
struct | stasis_subscription |
struct | stasis_subscription_statistics |
struct | stasis_threadpool_conf |
Threadpool configuration options. More... | |
struct | stasis_topic |
struct | stasis_topic_pool |
struct | sync_task_data |
struct | topic_pool_entry |
struct | topic_proxy |
Macros | |
#define | FMT_FIELDS "%-64s %-64s\n" |
#define | FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n" |
#define | FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n" |
#define | FMT_FIELDS "%-64s %10d %10d\n" |
#define | FMT_FIELDS2 "%-64s %10d %10d\n" |
#define | FMT_FIELDS2 "%-64s %10s %10d %10d\n" |
#define | FMT_HEADERS "%-64s %-64s\n" |
#define | FMT_HEADERS "%-64s %10s %10s %16s %16s\n" |
#define | FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n" |
#define | FMT_HEADERS "%-64s %10s %10s\n" |
#define | INITIAL_SUBSCRIBERS_MAX 4 |
#define | SUBSCRIPTION_STATISTICS_BUCKETS 57 |
#define | TOPIC_ALL_BUCKETS 997 |
#define | topic_lock_both(topic1, topic2) |
Lock two topics. More... | |
#define | TOPIC_POOL_BUCKETS 57 |
#define | TOPIC_STATISTICS_BUCKETS 57 |
Functions | |
struct stasis_subscription * | __stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func) |
Create a subscription. More... | |
struct stasis_subscription * | __stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func) |
Create a subscription whose callbacks occur on a thread pool. More... | |
static | AO2_GLOBAL_OBJ_STATIC (topic_statistics) |
static | AO2_GLOBAL_OBJ_STATIC (subscription_statistics) |
static | AO2_GLOBAL_OBJ_STATIC (globals) |
A global object container that will contain the stasis_config that gets swapped out on reloads. More... | |
AO2_STRING_FIELD_CASE_SORT_FN (topic_proxy, name) | |
AO2_STRING_FIELD_CMP_FN (topic_proxy, name) | |
AO2_STRING_FIELD_HASH_FN (topic_proxy, name) | |
AO2_STRING_FIELD_SORT_FN (stasis_subscription_statistics, uniqueid) | |
AO2_STRING_FIELD_SORT_FN (stasis_topic_statistics, name) | |
void | ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object) |
Add an object (snapshot) to the blob. More... | |
struct ast_multi_object_blob * | ast_multi_object_blob_create (struct ast_json *blob) |
Create a stasis user event multi object blob. More... | |
void | ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob) |
Publish single channel user event (for app_userevent compatibility) More... | |
static | AST_VECTOR (struct stasis_message_type_statistics) |
CONFIG_INFO_CORE ("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),) | |
Register information about the configs being processed by this module. More... | |
static int | declined_handler (const struct aco_option *opt, struct ast_variable *var, void *obj) |
static int | dispatch_exec_async (struct ast_taskprocessor_local *local) |
static int | dispatch_exec_sync (struct ast_taskprocessor_local *local) |
static unsigned int | dispatch_message (struct stasis_subscription *sub, struct stasis_message *message, int synchronous) |
static void | forward_dtor (void *obj) |
struct stasis_subscription * | internal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func) |
Create a subscription. More... | |
static int | link_topic_proxy (struct stasis_topic *topic, const char *name, const char *detail) |
static void | multi_object_blob_dtor (void *obj) |
static struct ast_str * | multi_object_blob_to_ami (void *obj) |
static struct ast_manager_event_blob * | multi_user_event_to_ami (struct stasis_message *message) |
static struct ast_json * | multi_user_event_to_json (struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) |
static void | proxy_dtor (void *weakproxy, void *container) |
static void | publish_msg (struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub) |
static void | send_subscription_subscribe (struct stasis_topic *topic, struct stasis_subscription *sub) |
static void | send_subscription_unsubscribe (struct stasis_topic *topic, struct stasis_subscription *sub) |
static void | stasis_cleanup (void) |
Cleanup function for graceful shutdowns. More... | |
static void * | stasis_config_alloc (void) |
static void | stasis_config_destructor (void *obj) |
static void | stasis_declined_config_destructor (void *obj) |
struct stasis_forward * | stasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic) |
Create a subscription which forwards all messages from one topic to another. More... | |
struct stasis_forward * | stasis_forward_cancel (struct stasis_forward *forward) |
int | stasis_init (void) |
Initialize the Stasis subsystem. More... | |
void | stasis_log_bad_type_access (const char *name) |
int | stasis_message_type_declined (const char *name) |
Check whether a message type is declined. More... | |
STASIS_MESSAGE_TYPE_DEFN (stasis_subscription_change_type) | |
void | stasis_publish (struct stasis_topic *topic, struct stasis_message *message) |
Publish a message to a topic's subscribers. More... | |
void | stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message) |
Publish a message to a topic's subscribers, synchronizing on the specified subscriber. More... | |
static char * | stasis_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
static char * | stasis_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
void | stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters) |
Indicate to a subscription that we are interested in messages with one or more formatters. More... | |
int | stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type) |
Indicate to a subscription that we are interested in a message type. More... | |
void | stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message) |
Stasis subscription callback function that does nothing. More... | |
int | stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type) |
Indicate to a subscription that we are not interested in a message type. More... | |
int | stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg) |
Determine whether a message is the final message to be received on a subscription. More... | |
int | stasis_subscription_is_done (struct stasis_subscription *subscription) |
Returns whether subscription has received its final message. More... | |
int | stasis_subscription_is_subscribed (const struct stasis_subscription *sub) |
Returns whether a subscription is currently subscribed. More... | |
void | stasis_subscription_join (struct stasis_subscription *subscription) |
Block until the last message is processed on a subscription. More... | |
int | stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water) |
Set the high and low alert water marks of the stasis subscription. More... | |
int | stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter) |
Set the message type filtering level on a subscription. More... | |
static struct stasis_subscription_statistics * | stasis_subscription_statistics_create (struct stasis_subscription *sub, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func) |
const char * | stasis_subscription_uniqueid (const struct stasis_subscription *sub) |
Get the unique ID for the subscription. More... | |
struct stasis_topic * | stasis_topic_create (const char *name) |
Create a new topic. More... | |
struct stasis_topic * | stasis_topic_create_with_detail (const char *name, const char *detail) |
Create a new topic with given detail. More... | |
const char * | stasis_topic_detail (const struct stasis_topic *topic) |
Return the detail of a topic. More... | |
struct stasis_topic * | stasis_topic_get (const char *name) |
Get a topic of the given name. More... | |
const char * | stasis_topic_name (const struct stasis_topic *topic) |
Return the name of a topic. More... | |
struct stasis_topic_pool * | stasis_topic_pool_create (struct stasis_topic *pooled_topic) |
Create a topic pool that routes messages from dynamically generated topics to the given topic. More... | |
void | stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name) |
Delete a topic from the topic pool. More... | |
struct stasis_topic * | stasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name) |
Find or create a topic in the pool. More... | |
int | stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name) |
Check if a topic exists in a pool. More... | |
static struct stasis_topic_statistics * | stasis_topic_statistics_create (struct stasis_topic *topic) |
size_t | stasis_topic_subscribers (const struct stasis_topic *topic) |
Return the number of subscribers of a topic. More... | |
struct stasis_subscription * | stasis_unsubscribe (struct stasis_subscription *sub) |
Cancel a subscription. More... | |
struct stasis_subscription * | stasis_unsubscribe_and_join (struct stasis_subscription *subscription) |
Cancel a subscription, blocking until the last message is processed. More... | |
static char * | statistics_show_messages (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
static char * | statistics_show_subscription (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
static char * | statistics_show_subscriptions (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
static char * | statistics_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
static char * | statistics_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
static int | sub_cleanup (void *data) |
static struct stasis_subscription_change * | subscription_change_alloc (struct stasis_topic *topic, const char *uniqueid, const char *description) |
static void | subscription_change_dtor (void *obj) |
static void | subscription_dtor (void *obj) |
static void | subscription_invoke (struct stasis_subscription *sub, struct stasis_message *message) |
Invoke the subscription's callback. More... | |
static int | subscription_statistics_cmp (void *obj, void *arg, int flags) |
static char * | subscription_statistics_complete_name (const char *word, int state) |
static void | subscription_statistics_destroy (void *obj) |
static int | subscription_statistics_hash (const void *obj, const int flags) |
static int | topic_add_subscription (struct stasis_topic *topic, struct stasis_subscription *sub) |
Add a subscriber to a topic. More... | |
static char * | topic_complete_name (const char *word) |
static void | topic_dtor (void *obj) |
static void | topic_pool_dtor (void *obj) |
static struct topic_pool_entry * | topic_pool_entry_alloc (const char *topic_name) |
static int | topic_pool_entry_cmp (void *obj, void *arg, int flags) |
static void | topic_pool_entry_dtor (void *obj) |
static int | topic_pool_entry_hash (const void *obj, const int flags) |
static int | topic_remove_subscription (struct stasis_topic *topic, struct stasis_subscription *sub) |
static int | topic_statistics_cmp (void *obj, void *arg, int flags) |
static char * | topic_statistics_complete_name (const char *word, int state) |
static void | topic_statistics_destroy (void *obj) |
static int | topic_statistics_hash (const void *obj, const int flags) |
static int | userevent_exclusion_cb (const char *key) |
STASIS_MESSAGE_TYPE_DEFN (ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,) | |
Define multi user event message type(s). More... | |
Variables | |
static struct ast_cli_entry | cli_stasis [] |
static struct ast_cli_entry | cli_stasis_statistics [] |
static struct aco_type | declined_option |
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type. More... | |
struct aco_type * | declined_options [] = ACO_TYPES(&declined_option) |
static ast_mutex_t | message_type_statistics_lock = { PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP , NULL, {1, 0} } |
struct aco_file | stasis_conf |
static struct ast_threadpool * | threadpool |
static struct aco_type | threadpool_option |
static struct aco_type * | threadpool_options [] = ACO_TYPES(&threadpool_option) |
struct ao2_container * | topic_all |
Stasis Message Bus API.
Definition in file stasis.c.
#define FMT_FIELDS "%-64s %-64s\n" |
#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n" |
#define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n" |
#define FMT_FIELDS "%-64s %10d %10d\n" |
#define FMT_FIELDS2 "%-64s %10d %10d\n" |
Referenced by statistics_show_subscriptions(), and statistics_show_topics().
#define FMT_FIELDS2 "%-64s %10s %10d %10d\n" |
#define FMT_HEADERS "%-64s %-64s\n" |
#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n" |
#define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n" |
#define FMT_HEADERS "%-64s %10s %10s\n" |
#define INITIAL_SUBSCRIBERS_MAX 4 |
Initial size of the subscribers list.
Definition at line 302 of file stasis.c.
Referenced by stasis_topic_create_with_detail().
#define SUBSCRIPTION_STATISTICS_BUCKETS 57 |
The number of buckets to use for subscription statistics
Definition at line 328 of file stasis.c.
Referenced by stasis_init().
#define TOPIC_ALL_BUCKETS 997 |
Definition at line 318 of file stasis.c.
Referenced by stasis_init().
#define topic_lock_both | ( | topic1, | |
topic2 | |||
) |
Lock two topics.
Definition at line 426 of file stasis.c.
Referenced by stasis_forward_all(), and stasis_forward_cancel().
#define TOPIC_POOL_BUCKETS 57 |
The number of buckets to use for topic pools
Definition at line 305 of file stasis.c.
Referenced by stasis_topic_pool_create().
#define TOPIC_STATISTICS_BUCKETS 57 |
The number of buckets to use for topic statistics
Definition at line 325 of file stasis.c.
Referenced by stasis_init().
struct stasis_subscription* __stasis_subscribe | ( | struct stasis_topic * | topic, |
stasis_subscription_cb | callback, | ||
void * | data, | ||
const char * | file, | ||
int | lineno, | ||
const char * | func | ||
) |
Create a subscription.
In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().
The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.
topic | Topic to subscribe to. |
callback | Callback function for subscription messages. |
data | Data to be passed to the callback, in addition to the message. |
NULL
on error. Definition at line 944 of file stasis.c.
References internal_stasis_subscribe().
Referenced by stasis_message_router_create_internal().
struct stasis_subscription* __stasis_subscribe_pool | ( | struct stasis_topic * | topic, |
stasis_subscription_cb | callback, | ||
void * | data, | ||
const char * | file, | ||
int | lineno, | ||
const char * | func | ||
) |
Create a subscription whose callbacks occur on a thread pool.
In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().
The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.
Unlike stasis_subscribe, this function will explicitly use a threadpool to dispatch items to its callback
. This form of subscription should be used when many subscriptions may be made to the specified topic
.
topic | Topic to subscribe to. |
callback | Callback function for subscription messages. |
data | Data to be passed to the callback, in addition to the message. |
NULL
on error. Definition at line 955 of file stasis.c.
References internal_stasis_subscribe().
Referenced by stasis_message_router_create_internal().
|
static |
Global container which stores statistics for topics
|
static |
Global container which stores statistics for subscriptions
|
static |
A global object container that will contain the stasis_config that gets swapped out on reloads.
AO2_STRING_FIELD_CASE_SORT_FN | ( | topic_proxy | , |
name | |||
) |
AO2_STRING_FIELD_CMP_FN | ( | topic_proxy | , |
name | |||
) |
AO2_STRING_FIELD_HASH_FN | ( | topic_proxy | , |
name | |||
) |
AO2_STRING_FIELD_SORT_FN | ( | stasis_subscription_statistics | , |
uniqueid | |||
) |
Referenced by statistics_show_subscription().
AO2_STRING_FIELD_SORT_FN | ( | stasis_topic_statistics | , |
name | |||
) |
|
static |
Vector containing message type information
Highest time spent dispatching messages to subscribers
Lowest time spent dispatching messages to subscribers
The number of messages that were not dispatched to any subscriber
The number of messages that were dispatched to at least 1 subscriber
The ids of the subscribers to this topic
Pointer to the topic (NOT refcounted, and must NOT be accessed)
Name of the topic
Definition at line 350 of file stasis.c.
References name.
CONFIG_INFO_CORE | ( | "stasis" | , |
cfg_info | , | ||
globals | , | ||
stasis_config_alloc | , | ||
. | files = ACO_FILES(&stasis_conf) |
||
) |
Register information about the configs being processed by this module.
|
static |
Definition at line 2304 of file stasis.c.
References ast_multi_user_event_type(), ast_str_container_add(), ast_strlen_zero, stasis_declined_config::declined, multi_user_event_to_ami(), multi_user_event_to_json(), STASIS_MESSAGE_TYPE_DEFN(), to_ami(), and ast_variable::value.
Referenced by stasis_init().
|
static |
Definition at line 1261 of file stasis.c.
References ao2_cleanup, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, and subscription_invoke().
Referenced by dispatch_message().
|
static |
Definition at line 1288 of file stasis.c.
References ao2_cleanup, ast_cond_signal, ast_mutex_lock, ast_mutex_unlock, sync_task_data::complete, sync_task_data::cond, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, sync_task_data::lock, subscription_invoke(), and sync_task_data::task_data.
Referenced by dispatch_message().
|
static |
Definition at line 1314 of file stasis.c.
References ao2_bump, ao2_cleanup, ast_atomic_fetchadd_int(), ast_cond_destroy, ast_cond_init, ast_cond_wait, ast_log, ast_mutex_destroy, ast_mutex_init, ast_mutex_lock, ast_mutex_unlock, ast_taskprocessor_push_local(), AST_VECTOR_GET, AST_VECTOR_SIZE, sync_task_data::complete, sync_task_data::cond, dispatch_exec_async(), dispatch_exec_sync(), sync_task_data::lock, LOG_ERROR, stasis_subscription::mailbox, stasis_message_type_statistics::message_type, NULL, stasis_message_type(), stasis_message_type_available_formatters(), stasis_message_type_id(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), STASIS_SUBSCRIPTION_FORMATTER_NONE, subscription_invoke(), and sync_task_data::task_data.
Referenced by publish_msg(), and send_subscription_unsubscribe().
|
static |
Definition at line 1538 of file stasis.c.
References ao2_cleanup, stasis_forward::from_topic, NULL, and stasis_forward::to_topic.
Referenced by stasis_forward_all().
struct stasis_subscription* internal_stasis_subscribe | ( | struct stasis_topic * | topic, |
stasis_subscription_cb | callback, | ||
void * | data, | ||
int | needs_mailbox, | ||
int | use_thread_pool, | ||
const char * | file, | ||
int | lineno, | ||
const char * | func | ||
) |
Create a subscription.
In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().
The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.
Note: modules outside of Stasis should use stasis_subscribe.
topic | Topic to subscribe to. |
callback | Callback function for subscription messages. |
data | Data to be passed to the callback, in addition to the message. |
needs_mailbox | Determines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread. |
use_thread_pool | Use the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero. |
NULL
on error. Definition at line 858 of file stasis.c.
References ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_cond_init, ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), ast_threadpool_serializer(), AST_VECTOR_INIT, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::join_cond, stasis_subscription::mailbox, NULL, send_subscription_subscribe(), STASIS_SUBSCRIPTION_FILTER_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_subscription_statistics_create(), stasis_topic_name(), statistics(), sub, subscription_dtor(), stasis_subscription::topic, topic_add_subscription(), TPS_REF_DEFAULT, and stasis_subscription::uniqueid.
Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().
|
static |
Definition at line 501 of file stasis.c.
References ao2_bump, ao2_cleanup, ao2_link_flags, ao2_ref, ao2_t_weakproxy_alloc, ao2_t_weakproxy_set_object, ao2_unlock, ao2_weakproxy_subscribe(), ao2_wrlock, ast_copy_string(), ast_log, ast_tvnow(), topic_proxy::buf, topic_proxy::creationtime, topic_proxy::detail, LOG_ERROR, topic_proxy::name, NULL, OBJ_NOLOCK, proxy_dtor(), and stasis_topic_get().
Referenced by stasis_topic_create_with_detail().
|
static |
Definition at line 1959 of file stasis.c.
References ao2_cleanup, ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, STASIS_UMOS_MAX, and type.
Referenced by ast_multi_object_blob_create().
|
static |
Definition at line 2096 of file stasis.c.
References ast_asprintf, ast_free, ast_manager_build_bridge_state_string_prefix(), ast_manager_build_channel_state_string_prefix(), ast_str_append(), ast_str_buffer(), ast_str_create, AST_VECTOR_GET, AST_VECTOR_SIZE, name, NULL, STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.
Referenced by multi_user_event_to_ami().
|
static |
Definition at line 2155 of file stasis.c.
References ast_free, ast_json_object_get(), ast_json_string_get(), ast_manager_event_blob_create(), ast_manager_str_from_json_object(), ast_str_buffer(), ast_multi_object_blob::blob, EVENT_FLAG_USER, multi_object_blob_to_ami(), NULL, RAII_VAR, stasis_message_data(), and userevent_exclusion_cb().
Referenced by declined_handler().
|
static |
Definition at line 2045 of file stasis.c.
References ast_bridge_snapshot_to_json(), ast_channel_snapshot_to_json(), ast_endpoint_snapshot_to_json(), ast_json_object_create(), ast_json_object_get(), ast_json_object_set(), ast_json_ref(), ast_json_string_create(), ast_json_timeval(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, name, NULL, out, stasis_message_data(), stasis_message_timestamp(), STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.
Referenced by declined_handler().
|
static |
Definition at line 413 of file stasis.c.
References ao2_cleanup, ao2_unlink, sub, topic_add_subscription(), and topic_remove_subscription().
Referenced by link_topic_proxy().
|
static |
Definition at line 1432 of file stasis.c.
References ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_atomic_fetchadd_int(), ast_mutex_lock, ast_mutex_unlock, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_GET_ADDR, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, dispatch_message(), stasis_message_type_statistics::message_type, message_type_id, message_type_statistics_lock, NULL, stasis_message_type_statistics::published, stasis_message_type(), stasis_message_type_id(), stasis_topic_subscribers(), statistics(), and stasis_message_type_statistics::unused.
Referenced by stasis_publish(), and stasis_publish_sync().
|
static |
Definition at line 1648 of file stasis.c.
References ao2_cleanup, ast_assert, stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), subscription_change_alloc(), and stasis_subscription::uniqueid.
Referenced by internal_stasis_subscribe(), and subscription_invoke().
|
static |
Definition at line 1676 of file stasis.c.
References ao2_cleanup, ast_assert, dispatch_message(), stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), subscription_change_alloc(), and stasis_subscription::uniqueid.
Referenced by stasis_unsubscribe(), and subscription_invoke().
|
static |
Cleanup function for graceful shutdowns.
Definition at line 3042 of file stasis.c.
References aco_info_destroy(), ao2_cleanup, ao2_global_obj_release, ARRAY_LEN, ast_cli_unregister_multiple(), ast_multi_user_event_type(), ast_threadpool_shutdown(), AST_VECTOR_FREE, globals, NULL, STASIS_MESSAGE_TYPE_CLEANUP, and stasis_subscription_change_type().
Referenced by stasis_init().
|
static |
Definition at line 2253 of file stasis.c.
References ao2_alloc, ao2_ref, ast_calloc, ast_str_container_alloc, stasis_declined_config::declined, stasis_config::declined_message_types, NULL, stasis_config_destructor(), stasis_declined_config_destructor(), and stasis_config::threadpool_options.
Referenced by stasis_init().
|
static |
Definition at line 2245 of file stasis.c.
References ao2_cleanup, ast_free, stasis_config::declined_message_types, and stasis_config::threadpool_options.
Referenced by stasis_config_alloc().
|
static |
Definition at line 2238 of file stasis.c.
References ao2_cleanup, and stasis_declined_config::declined.
Referenced by stasis_config_alloc().
struct stasis_forward* stasis_forward_all | ( | struct stasis_topic * | from_topic, |
struct stasis_topic * | to_topic | ||
) |
Create a subscription which forwards all messages from one topic to another.
Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.
from_topic | Topic to forward. |
to_topic | Destination topic of forwarded messages. |
NULL
on error. Definition at line 1578 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ao2_unlock, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, forward_dtor(), stasis_forward::from_topic, NULL, stasis_forward::to_topic, topic_add_subscription(), and topic_lock_both.
Referenced by __init_manager(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), ast_channel_internal_setup_topics(), AST_TEST_DEFINE(), create_subscriptions(), endpoint_internal_create(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), stasis_topic_pool_get_topic(), and state_alloc().
struct stasis_forward* stasis_forward_cancel | ( | struct stasis_forward * | forward | ) |
Definition at line 1548 of file stasis.c.
References ao2_cleanup, ao2_unlock, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_forward::from_topic, NULL, stasis_forward::to_topic, topic_lock_both, and topic_remove_subscription().
Referenced by all_dtor(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_internal_cleanup(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), bridge_channel_control_thread(), cleanup_module(), destroy_subscriptions(), forwards_unsubscribe(), load_general_config(), load_module(), manager_bridging_cleanup(), manager_channels_shutdown(), manager_mwi_shutdown(), manager_shutdown(), manager_system_shutdown(), stasis_cp_single_unsubscribe(), state_dtor(), topic_pool_entry_dtor(), and unload_module().
int stasis_init | ( | void | ) |
Initialize the Stasis subsystem.
Definition at line 3061 of file stasis.c.
References ACO_EXACT, aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_log, ast_multi_user_event_type(), ast_register_cleanup(), ast_threadpool_create(), AST_THREADPOOL_OPTIONS_VERSION, AST_VECTOR_INIT, ast_threadpool_options::auto_increment, declined_handler(), stasis_config::declined_message_types, FLDSET, globals, ast_threadpool_options::idle_timeout, stasis_threadpool_conf::idle_timeout_sec, ast_threadpool_options::initial_size, stasis_threadpool_conf::initial_size, LOG_ERROR, LOG_NOTICE, ast_threadpool_options::max_size, stasis_threadpool_conf::max_size, NULL, OPT_INT_T, PARSE_IN_RANGE, stasis_cache_init(), stasis_cleanup(), stasis_config_alloc(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), SUBSCRIPTION_STATISTICS_BUCKETS, subscription_statistics_cmp(), subscription_statistics_hash(), stasis_config::threadpool_options, TOPIC_ALL_BUCKETS, TOPIC_STATISTICS_BUCKETS, topic_statistics_cmp(), topic_statistics_hash(), and ast_threadpool_options::version.
Referenced by asterisk_daemon().
void stasis_log_bad_type_access | ( | const char * | name | ) |
Definition at line 1940 of file stasis.c.
References ast_log, LOG_ERROR, and stasis_message_type_declined().
int stasis_message_type_declined | ( | const char * | name | ) |
Check whether a message type is declined.
name | The name of the message type to check |
zero | The message type is not declined |
non-zero | The message type is declined |
Definition at line 2283 of file stasis.c.
References ao2_cleanup, ao2_find, ao2_global_obj_ref, ao2_ref, ast_log, stasis_declined_config::declined, stasis_config::declined_message_types, globals, LOG_NOTICE, and OBJ_SEARCH_KEY.
Referenced by stasis_log_bad_type_access(), and stasis_message_type_create().
STASIS_MESSAGE_TYPE_DEFN | ( | stasis_subscription_change_type | ) |
Referenced by declined_handler().
STASIS_MESSAGE_TYPE_DEFN | ( | ast_multi_user_event_type | , |
. | to_json = multi_user_event_to_json , |
||
. | to_ami = multi_user_event_to_ami |
||
) |
Define multi user event message type(s).
void stasis_publish | ( | struct stasis_topic * | topic, |
struct stasis_message * | message | ||
) |
Publish a message to a topic's subscribers.
topic | Topic. |
message | Message to publish. |
This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.
Definition at line 1511 of file stasis.c.
References NULL, and publish_msg().
Referenced by __ast_test_suite_event_notify(), aoc_publish_blob(), app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_monitor_start(), ast_monitor_stop(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_handler(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cache_test_aggregate_publish_fn(), caching_topic_exec(), cc_publish(), clear_node_cache(), device_state_aggregate_publish(), endpoint_publish_snapshot(), endpoint_state_cb(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), meetme_stasis_generate_msg(), mixmonitor_exec(), moh_post_start(), moh_post_stop(), notify_new_message(), phase_e_handler(), presence_state_event(), publish_acl_change(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_local_bridge_message(), publish_message_for_channel_topics(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), remove_device_states_cb(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), send_call_pickup_stasis_message(), send_conf_stasis(), send_conf_stasis_snapshots(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), stop_mixmonitor_full(), stun_monitor_request(), and talk_detect_audiohook_cb().
void stasis_publish_sync | ( | struct stasis_subscription * | sub, |
struct stasis_message * | message | ||
) |
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
sub | Subscription to synchronize on. |
message | Message to publish. |
The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.
All other subscribers to the topic the stasis_subpscription is subscribed to are also delivered the message; this delivery however happens asynchronously.
Definition at line 1516 of file stasis.c.
References ast_assert, NULL, publish_msg(), and stasis_subscription::topic.
Referenced by AST_TEST_DEFINE(), and stasis_message_router_publish_sync().
|
static |
Definition at line 2417 of file stasis.c.
References ao2_lock, ao2_ref, ao2_unlock, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), ast_format_duration_hh_mm_ss(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, NULL, ast_cli_args::pos, stasis_topic_get(), stasis_subscription::topic, topic_complete_name(), stasis_subscription::uniqueid, ast_cli_entry::usage, and ast_cli_args::word.
|
static |
Definition at line 2334 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_list, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, topic_proxy::detail, ast_cli_args::fd, FMT_FIELDS, FMT_HEADERS, topic_proxy::name, NULL, OBJ_SEARCH_OBJECT, and ast_cli_entry::usage.
void stasis_subscription_accept_formatters | ( | struct stasis_subscription * | subscription, |
enum stasis_subscription_message_formatters | formatters | ||
) |
Indicate to a subscription that we are interested in messages with one or more formatters.
subscription | Subscription to alter. |
formatters | A bitmap of stasis_subscription_message_formatters we wish to receive. |
Definition at line 1095 of file stasis.c.
References ao2_lock, ao2_unlock, ast_assert, NULL, and stasis_subscription::topic.
Referenced by AST_TEST_DEFINE(), stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().
int stasis_subscription_accept_message_type | ( | struct stasis_subscription * | subscription, |
const struct stasis_message_type * | type | ||
) |
Indicate to a subscription that we are interested in a message type.
This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.
subscription | Subscription to add message type to. |
type | The message type we wish to receive. |
0 | on success |
-1 | failure |
Definition at line 1025 of file stasis.c.
References ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, NULL, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.
Referenced by acl_change_stasis_subscribe(), add_peer_mwi_subs(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), mwi_stasis_subscription_alloc(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_create_internal(), subscribe_device_state(), and xmpp_init_event_distribution().
void stasis_subscription_cb_noop | ( | void * | data, |
struct stasis_subscription * | sub, | ||
struct stasis_message * | message | ||
) |
Stasis subscription callback function that does nothing.
Definition at line 811 of file stasis.c.
Referenced by build_gateway(), build_peer(), and mkintf().
int stasis_subscription_decline_message_type | ( | struct stasis_subscription * | subscription, |
const struct stasis_message_type * | type | ||
) |
Indicate to a subscription that we are not interested in a message type.
subscription | Subscription to remove message type from. |
type | The message type we don't wish to receive. |
0 | on success |
-1 | failure |
Definition at line 1055 of file stasis.c.
References ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, NULL, stasis_message_type_id(), stasis_message_type_name(), and stasis_subscription::topic.
Referenced by AST_TEST_DEFINE().
int stasis_subscription_final_message | ( | struct stasis_subscription * | sub, |
struct stasis_message * | msg | ||
) |
Determine whether a message is the final message to be received on a subscription.
sub | Subscription on which the message was received. |
msg | Message to check. |
Definition at line 1176 of file stasis.c.
References stasis_subscription_change::description, stasis_message_data(), stasis_message_type(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), and stasis_subscription_change::uniqueid.
Referenced by bridge_subscription_change_handler(), caching_topic_exec(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), default_route(), device_state_cb(), dispatch_message(), endpoint_subscription_change(), generic_agent_devstate_cb(), message_sink_cb(), mwi_event_cb(), mwi_stasis_cb(), park_announce_update_cb(), parker_update_cb(), queue_bridge_cb(), queue_channel_cb(), refer_progress_bridge(), router_dispatch(), statsmaker(), sub_subscription_change_handler(), and subscription_invoke().
int stasis_subscription_is_done | ( | struct stasis_subscription * | subscription | ) |
Returns whether subscription has received its final message.
Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.
subscription | Subscription. |
Definition at line 1120 of file stasis.c.
References ao2_lock, ao2_unlock, and stasis_subscription::final_message_rxed.
Referenced by router_dtor(), stasis_caching_topic_dtor(), stasis_message_router_is_done(), and subscription_dtor().
int stasis_subscription_is_subscribed | ( | const struct stasis_subscription * | sub | ) |
Returns whether a subscription is currently subscribed.
Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.
sub | Subscription to check |
Definition at line 1152 of file stasis.c.
References ao2_lock, ao2_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, sub, and stasis_subscription::topic.
Referenced by asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), router_dtor(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_caching_topic_dtor(), stasis_caching_unsubscribe(), subscription_dtor(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().
void stasis_subscription_join | ( | struct stasis_subscription * | subscription | ) |
Block until the last message is processed on a subscription.
This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.
subscription | Subscription to block on. |
Definition at line 1107 of file stasis.c.
References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, stasis_subscription::final_message_processed, and stasis_subscription::join_cond.
Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().
int stasis_subscription_set_congestion_limits | ( | struct stasis_subscription * | subscription, |
long | low_water, | ||
long | high_water | ||
) |
Set the high and low alert water marks of the stasis subscription.
subscription | Pointer to a stasis subscription |
low_water | New queue low water mark. (-1 to set as 90% of high_water) |
high_water | New queue high water mark. |
0 | on success. |
-1 | on error (water marks not changed). |
Definition at line 1013 of file stasis.c.
References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.
Referenced by stasis_message_router_set_congestion_limits().
int stasis_subscription_set_filter | ( | struct stasis_subscription * | subscription, |
enum stasis_subscription_message_filter | filter | ||
) |
Set the message type filtering level on a subscription.
This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.
subscription | Subscription that should receive all messages. |
filter | What filter to use |
0 | on success |
-1 | failure |
Definition at line 1079 of file stasis.c.
References ao2_lock, ao2_unlock, filter(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.
Referenced by acl_change_stasis_subscribe(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), subscribe_device_state(), and xmpp_init_event_distribution().
|
static |
Definition at line 823 of file stasis.c.
References ao2_alloc, ao2_cleanup, ao2_global_obj_ref, ao2_link, ao2_ref, ast_str_container_alloc, make_ari_stubs::file, stasis_subscription_statistics::file, stasis_subscription_statistics::func, stasis_subscription_statistics::lineno, NULL, RAII_VAR, statistics(), sub, stasis_subscription_statistics::sub, subscription_statistics_destroy(), stasis_subscription_statistics::topics, stasis_subscription_statistics::uniqueid, stasis_subscription::uniqueid, stasis_subscription_statistics::uses_mailbox, and stasis_subscription_statistics::uses_threadpool.
Referenced by internal_stasis_subscribe().
const char* stasis_subscription_uniqueid | ( | const struct stasis_subscription * | sub | ) |
Get the unique ID for the subscription.
sub | Subscription for which to get the unique ID. |
Definition at line 1171 of file stasis.c.
References stasis_subscription::uniqueid.
Referenced by AST_TEST_DEFINE(), stasis_subscription_final_message(), topic_add_subscription(), and topic_remove_subscription().
struct stasis_topic* stasis_topic_create | ( | const char * | name | ) |
Create a new topic.
name | Name of the new topic. |
NULL
on error. Definition at line 618 of file stasis.c.
References stasis_topic_create_with_detail().
Referenced by __init_manager(), app_create(), app_init(), ast_channel_internal_setup_topics(), ast_parking_stasis_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_bridging_init(), ast_stasis_channels_init(), ast_stasis_system_init(), AST_TEST_DEFINE(), ast_test_init(), create_cts(), create_subscriptions(), devstate_init(), load_module(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), stasis_topic_pool_get_topic(), and state_alloc().
struct stasis_topic* stasis_topic_create_with_detail | ( | const char * | name, |
const char * | detail | ||
) |
Create a new topic with given detail.
name | Name of the new topic. |
detail | Detail description of the new topic. i.e. "Queue main topic for subscribing every queue event" |
NULL
on error.Definition at line 569 of file stasis.c.
References ao2_ref, ao2_t_alloc, ast_debug, AST_VECTOR_INIT, INITIAL_SUBSCRIBERS_MAX, link_topic_proxy(), NULL, stasis_topic_get(), stasis_topic_statistics_create(), and topic_dtor().
Referenced by stasis_topic_create().
const char* stasis_topic_detail | ( | const struct stasis_topic * | topic | ) |
struct stasis_topic* stasis_topic_get | ( | const char * | name | ) |
Get a topic of the given name.
name | Topic's name. |
NULL
on error or not exist.Definition at line 623 of file stasis.c.
References ao2_weakproxy_find, and OBJ_SEARCH_KEY.
Referenced by link_topic_proxy(), stasis_show_topic(), and stasis_topic_create_with_detail().
const char* stasis_topic_name | ( | const struct stasis_topic * | topic | ) |
Return the name of a topic.
topic | Topic. |
NULL
if topic is NULL
. Definition at line 628 of file stasis.c.
References NULL.
Referenced by bridge_topics_destroy(), caching_topic_exec(), internal_stasis_subscribe(), stasis_caching_topic_create(), stasis_caching_topic_dtor(), stasis_message_router_create_internal(), stasis_state_add_publisher(), stasis_state_add_subscriber(), stasis_state_manager_create(), stasis_state_subscribe_pool(), stasis_topic_pool_create(), stasis_topic_pool_delete_topic(), stasis_topic_pool_get_topic(), state_alloc(), state_id_by_topic(), state_manager_dtor(), topic_add_subscription(), topic_pool_dtor(), topic_pool_entry_cmp(), and topic_remove_subscription().
struct stasis_topic_pool* stasis_topic_pool_create | ( | struct stasis_topic * | pooled_topic | ) |
Create a topic pool that routes messages from dynamically generated topics to the given topic.
pooled_topic | Topic to which messages will be routed |
NULL
on failure Definition at line 1833 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, NULL, pool, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, stasis_topic_name(), TOPIC_POOL_BUCKETS, topic_pool_dtor(), topic_pool_entry_cmp(), and topic_pool_entry_hash().
Referenced by app_init(), ast_stasis_bridging_init(), and devstate_init().
void stasis_topic_pool_delete_topic | ( | struct stasis_topic_pool * | pool, |
const char * | topic_name | ||
) |
Delete a topic from the topic pool.
pool | Pool from which to delete the topic |
topic_name | Name of the topic to delete in the form of <pool_topic_name>/<topic_name> or just <topic_name> |
Definition at line 1864 of file stasis.c.
References ao2_find, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().
Referenced by bridge_topics_destroy().
struct stasis_topic* stasis_topic_pool_get_topic | ( | struct stasis_topic_pool * | pool, |
const char * | topic_name | ||
) |
Find or create a topic in the pool.
pool | Pool for which to get the topic |
topic_name | Name of the topic to get |
NULL
if the topic was not found and could not be allocated Definition at line 1884 of file stasis.c.
References ao2_cleanup, ao2_find, ao2_link_flags, ast_asprintf, ast_free, topic_pool_entry::forward, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, RAII_VAR, SCOPED_AO2LOCK, stasis_forward_all(), stasis_topic_create(), stasis_topic_name(), topic_pool_entry::topic, and topic_pool_entry_alloc().
Referenced by ast_device_state_topic(), ast_queue_topic(), and bridge_topics_init().
int stasis_topic_pool_topic_exists | ( | const struct stasis_topic_pool * | pool, |
const char * | topic_name | ||
) |
Check if a topic exists in a pool.
pool | Pool to check |
topic_name | Name of the topic to check |
1 | exists |
0 | does not exist |
Definition at line 1927 of file stasis.c.
References ao2_find, ao2_ref, OBJ_SEARCH_KEY, and stasis_topic_pool::pool_container.
Referenced by ast_publish_device_state_full().
|
static |
Definition at line 472 of file stasis.c.
References ao2_alloc, ao2_cleanup, ao2_global_obj_ref, ao2_link, ao2_ref, ast_str_container_alloc, NULL, RAII_VAR, statistics(), and topic_statistics_destroy().
Referenced by stasis_topic_create_with_detail().
size_t stasis_topic_subscribers | ( | const struct stasis_topic * | topic | ) |
Return the number of subscribers of a topic.
topic | Topic. |
Definition at line 644 of file stasis.c.
References AST_VECTOR_SIZE.
Referenced by caching_topic_exec(), and publish_msg().
struct stasis_subscription* stasis_unsubscribe | ( | struct stasis_subscription * | subscription | ) |
Cancel a subscription.
Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.
subscription | Subscription to cancel. |
NULL
for convenience Definition at line 973 of file stasis.c.
References ao2_bump, ao2_cleanup, ast_log, ast_taskprocessor_push(), LOG_ERROR, stasis_subscription::mailbox, NULL, send_subscription_unsubscribe(), sub_cleanup(), stasis_subscription::topic, and topic_remove_subscription().
Referenced by AST_TEST_DEFINE(), cc_generic_agent_destructor(), destroy_cts(), generic_agent_devstate_cb(), generic_monitor_instance_list_destructor(), mwi_startup_event_cb(), park_and_announce_app_exec(), parked_subscription_datastore_destroy(), refer_progress_bridge(), refer_progress_destroy(), refer_progress_framehook_destroy(), startup_event_cb(), stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), subscription_persistence_event_cb(), unload_module(), and xmpp_init_event_distribution().
struct stasis_subscription* stasis_unsubscribe_and_join | ( | struct stasis_subscription * | subscription | ) |
Cancel a subscription, blocking until the last message is processed.
While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).
subscription | Subscription to cancel. |
NULL
for convenience Definition at line 1136 of file stasis.c.
References ao2_cleanup, ao2_ref, NULL, stasis_subscription_join(), and stasis_unsubscribe().
Referenced by acl_change_event_stasis_unsubscribe(), acl_change_stasis_unsubscribe(), ast_res_pjsip_destroy_configuration(), AST_TEST_DEFINE(), ast_xmpp_client_disconnect(), asterisk_stop_devicestate_publishing(), asterisk_stop_mwi_publishing(), devstate_cleanup(), network_change_stasis_unsubscribe(), parking_manager_disable_stasis(), presence_change_common(), remove_device_state_subscription(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), unload_module(), and unload_pbx().
|
static |
Definition at line 2865 of file stasis.c.
References ast_cli_args::argc, ast_cli_entry::args, ast_cli(), ast_mutex_lock, ast_mutex_unlock, AST_VECTOR_GET_ADDR, AST_VECTOR_SIZE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_FIELDS, FMT_HEADERS, stasis_message_type_statistics::message_type, message_type_statistics_lock, NULL, stasis_message_type_statistics::published, stasis_message_type_name(), statistics(), stasis_message_type_statistics::unused, and ast_cli_entry::usage.
|
static |
Definition at line 2604 of file stasis.c.
References ao2_container_count(), ao2_find, ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_lock, ao2_ref, AO2_STRING_FIELD_SORT_FN(), ao2_unlock, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, stasis_subscription_statistics::file, stasis_subscription_statistics::func, stasis_subscription_statistics::highest_time_invoked, stasis_subscription_statistics::highest_time_message_type, stasis_subscription_statistics::lineno, stasis_subscription_statistics::lowest_time_invoked, stasis_subscription_statistics::messages_dropped, stasis_subscription_statistics::messages_passed, ast_cli_args::n, name, NULL, OBJ_SEARCH_KEY, ast_cli_args::pos, S_OR, stasis_message_type_name(), statistics(), stasis_subscription_statistics::sub, subscription_statistics_complete_name(), stasis_subscription_statistics::topics, stasis_subscription_statistics::uniqueid, ast_cli_entry::usage, stasis_subscription_statistics::uses_mailbox, stasis_subscription_statistics::uses_threadpool, and ast_cli_args::word.
|
static |
Definition at line 2490 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_container_alloc_rbtree, ao2_container_dup(), ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_FIELDS, FMT_FIELDS2, FMT_HEADERS, stasis_subscription_statistics::highest_time_invoked, stasis_subscription_statistics::lowest_time_invoked, stasis_subscription_statistics::messages_dropped, stasis_subscription_statistics::messages_passed, NULL, statistics(), stasis_subscription_statistics::uniqueid, and ast_cli_entry::usage.
|
static |
Definition at line 2799 of file stasis.c.
References ao2_container_count(), ao2_find, ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, ast_cli_args::n, NULL, OBJ_SEARCH_KEY, ast_cli_args::pos, statistics(), topic_statistics_complete_name(), ast_cli_entry::usage, and ast_cli_args::word.
|
static |
Definition at line 2684 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_container_alloc_rbtree, ao2_container_count(), ao2_container_dup(), ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_FIELDS, FMT_FIELDS2, FMT_HEADERS, NULL, statistics(), and ast_cli_entry::usage.
|
static |
Definition at line 966 of file stasis.c.
References ao2_cleanup, and stasis_subscription::data.
Referenced by stasis_unsubscribe().
|
static |
Definition at line 1627 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_ref, ast_copy_string(), stasis_subscription_change::description, NULL, subscription_change_dtor(), stasis_subscription_change::topic, and stasis_subscription_change::uniqueid.
Referenced by send_subscription_subscribe(), and send_subscription_unsubscribe().
|
static |
Definition at line 1620 of file stasis.c.
References ao2_cleanup, and stasis_subscription_change::topic.
Referenced by subscription_change_alloc().
|
static |
Definition at line 715 of file stasis.c.
References ao2_cleanup, ao2_global_obj_ref, ao2_ref, ao2_unlink, ast_assert, ast_cond_destroy, ast_free, ast_taskprocessor_unreference(), AST_VECTOR_FREE, stasis_subscription::join_cond, stasis_subscription::mailbox, NULL, stasis_subscription_is_done(), stasis_subscription_is_subscribed(), stasis_subscription::topic, and stasis_subscription::uniqueid.
Referenced by internal_stasis_subscribe().
|
static |
Invoke the subscription's callback.
sub | Subscription to invoke. |
topic | Topic message was published to. |
message | Message to send. |
Definition at line 756 of file stasis.c.
References ao2_lock, ao2_unlock, ast_cond_signal, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::final_message_processed, stasis_subscription::final_message_rxed, stasis_subscription::join_cond, message_type_id, send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_message_type(), stasis_message_type_id(), stasis_subscription_change_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), and statistics().
Referenced by dispatch_exec_async(), dispatch_exec_sync(), and dispatch_message().
|
static |
Definition at line 2945 of file stasis.c.
References ast_assert, CMP_MATCH, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, OBJ_SEARCH_PARTIAL_KEY, and stasis_subscription_statistics::uniqueid.
Referenced by stasis_init().
|
static |
Definition at line 2570 of file stasis.c.
References ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_strdup, NULL, result, statistics(), and stasis_subscription_statistics::uniqueid.
Referenced by statistics_show_subscription().
|
static |
Definition at line 816 of file stasis.c.
References ao2_cleanup, statistics(), and stasis_subscription_statistics::topics.
Referenced by stasis_subscription_statistics_create().
|
static |
Definition at line 2924 of file stasis.c.
References ast_assert, ast_str_case_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.
Referenced by stasis_init().
|
static |
Add a subscriber to a topic.
topic | Topic |
sub | Subscriber |
Definition at line 1203 of file stasis.c.
References ao2_lock, ao2_unlock, ast_str_container_add(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), and stasis_topic_name().
Referenced by internal_stasis_subscribe(), proxy_dtor(), and stasis_forward_all().
|
static |
Definition at line 2391 of file stasis.c.
References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_completion_add(), ast_strdup, topic_proxy::name, and NULL.
Referenced by stasis_show_topic().
|
static |
Definition at line 434 of file stasis.c.
References ao2_global_obj_ref, ao2_ref, ao2_unlink, ast_assert, ast_debug, AST_VECTOR_FREE, and AST_VECTOR_SIZE.
Referenced by stasis_topic_create_with_detail().
|
static |
Definition at line 1744 of file stasis.c.
References ao2_cleanup, ao2_container_unregister(), ast_alloca, NULL, pool, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().
Referenced by stasis_topic_pool_create().
|
static |
Definition at line 1724 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, topic_pool_entry::name, NULL, and topic_pool_entry_dtor().
Referenced by stasis_topic_pool_get_topic().
|
static |
Definition at line 1784 of file stasis.c.
References ast_assert, CMP_MATCH, topic_pool_entry::name, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, OBJ_SEARCH_PARTIAL_KEY, stasis_topic_name(), and topic_pool_entry::topic.
Referenced by stasis_topic_pool_create().
|
static |
Definition at line 1715 of file stasis.c.
References ao2_cleanup, topic_pool_entry::forward, NULL, stasis_forward_cancel(), and topic_pool_entry::topic.
Referenced by topic_pool_entry_alloc().
|
static |
Definition at line 1763 of file stasis.c.
References ast_assert, ast_str_case_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.
Referenced by stasis_topic_pool_create().
|
static |
Definition at line 1231 of file stasis.c.
References ao2_lock, ao2_unlock, ast_str_container_remove(), AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), and stasis_topic_name().
Referenced by proxy_dtor(), stasis_forward_cancel(), and stasis_unsubscribe().
|
static |
Definition at line 3003 of file stasis.c.
References ast_assert, CMP_MATCH, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.
Referenced by stasis_init().
|
static |
Definition at line 2765 of file stasis.c.
References ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_strdup, NULL, result, and statistics().
Referenced by statistics_show_topic().
|
static |
Definition at line 465 of file stasis.c.
References ao2_cleanup, and statistics().
Referenced by stasis_topic_statistics_create().
|
static |
Definition at line 2982 of file stasis.c.
References ast_assert, ast_str_case_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.
Referenced by stasis_init().
|
static |
Definition at line 2147 of file stasis.c.
Referenced by multi_user_event_to_ami().
|
static |
|
static |
|
static |
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type.
struct aco_type* declined_options[] = ACO_TYPES(&declined_option) |
|
static |
Lock to protect the message types vector
Definition at line 347 of file stasis.c.
Referenced by publish_msg(), and statistics_show_messages().
struct aco_file stasis_conf |
|
static |
|
static |
struct ao2_container* topic_all |