Asterisk - The Open Source Telephony Project  18.5.0
Data Structures | Macros | Functions | Variables
stasis.c File Reference

Stasis Message Bus API. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
#include "asterisk/cli.h"
Include dependency graph for stasis.c:

Go to the source code of this file.

Data Structures

struct  ast_multi_object_blob
 A multi object blob data structure to carry user event stasis messages. More...
 
struct  stasis_config
 
struct  stasis_declined_config
 A structure to hold global configuration-related options. More...
 
struct  stasis_forward
 Forwarding information. More...
 
struct  stasis_message_type_statistics
 
struct  stasis_subscription
 
struct  stasis_subscription_statistics
 
struct  stasis_threadpool_conf
 Threadpool configuration options. More...
 
struct  stasis_topic
 
struct  stasis_topic_pool
 
struct  sync_task_data
 
struct  topic_pool_entry
 
struct  topic_proxy
 

Macros

#define FMT_FIELDS   "%-64s %-64s\n"
 
#define FMT_FIELDS   "%-64s %10d %10d %16ld %16ld\n"
 
#define FMT_FIELDS   "%-64s %10d %10d %10d %16ld %16ld\n"
 
#define FMT_FIELDS   "%-64s %10d %10d\n"
 
#define FMT_FIELDS2   "%-64s %10d %10d\n"
 
#define FMT_FIELDS2   "%-64s %10s %10d %10d\n"
 
#define FMT_HEADERS   "%-64s %-64s\n"
 
#define FMT_HEADERS   "%-64s %10s %10s %16s %16s\n"
 
#define FMT_HEADERS   "%-64s %10s %10s %10s %16s %16s\n"
 
#define FMT_HEADERS   "%-64s %10s %10s\n"
 
#define INITIAL_SUBSCRIBERS_MAX   4
 
#define SUBSCRIPTION_STATISTICS_BUCKETS   57
 
#define TOPIC_ALL_BUCKETS   997
 
#define topic_lock_both(topic1, topic2)
 Lock two topics. More...
 
#define TOPIC_POOL_BUCKETS   57
 
#define TOPIC_STATISTICS_BUCKETS   57
 

Functions

struct stasis_subscription__stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription. More...
 
struct stasis_subscription__stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription whose callbacks occur on a thread pool. More...
 
static AO2_GLOBAL_OBJ_STATIC (topic_statistics)
 
static AO2_GLOBAL_OBJ_STATIC (subscription_statistics)
 
static AO2_GLOBAL_OBJ_STATIC (globals)
 A global object container that will contain the stasis_config that gets swapped out on reloads. More...
 
 AO2_STRING_FIELD_CASE_SORT_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_CMP_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_HASH_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_SORT_FN (stasis_subscription_statistics, uniqueid)
 
 AO2_STRING_FIELD_SORT_FN (stasis_topic_statistics, name)
 
void ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
 Add an object (snapshot) to the blob. More...
 
struct ast_multi_object_blobast_multi_object_blob_create (struct ast_json *blob)
 Create a stasis user event multi object blob. More...
 
void ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
 Publish single channel user event (for app_userevent compatibility) More...
 
static AST_VECTOR (struct stasis_message_type_statistics)
 
 CONFIG_INFO_CORE ("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
 Register information about the configs being processed by this module. More...
 
static int declined_handler (const struct aco_option *opt, struct ast_variable *var, void *obj)
 
static int dispatch_exec_async (struct ast_taskprocessor_local *local)
 
static int dispatch_exec_sync (struct ast_taskprocessor_local *local)
 
static unsigned int dispatch_message (struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
 
static void forward_dtor (void *obj)
 
struct stasis_subscriptioninternal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
 Create a subscription. More...
 
static int link_topic_proxy (struct stasis_topic *topic, const char *name, const char *detail)
 
static void multi_object_blob_dtor (void *obj)
 
static struct ast_strmulti_object_blob_to_ami (void *obj)
 
static struct ast_manager_event_blobmulti_user_event_to_ami (struct stasis_message *message)
 
static struct ast_jsonmulti_user_event_to_json (struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
 
static void proxy_dtor (void *weakproxy, void *container)
 
static void publish_msg (struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
 
static void send_subscription_subscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void send_subscription_unsubscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void stasis_cleanup (void)
 Cleanup function for graceful shutdowns. More...
 
static void * stasis_config_alloc (void)
 
static void stasis_config_destructor (void *obj)
 
static void stasis_declined_config_destructor (void *obj)
 
struct stasis_forwardstasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic)
 Create a subscription which forwards all messages from one topic to another. More...
 
struct stasis_forwardstasis_forward_cancel (struct stasis_forward *forward)
 
int stasis_init (void)
 Initialize the Stasis subsystem. More...
 
void stasis_log_bad_type_access (const char *name)
 
int stasis_message_type_declined (const char *name)
 Check whether a message type is declined. More...
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_subscription_change_type)
 
void stasis_publish (struct stasis_topic *topic, struct stasis_message *message)
 Publish a message to a topic's subscribers. More...
 
void stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message)
 Publish a message to a topic's subscribers, synchronizing on the specified subscriber. More...
 
static char * stasis_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * stasis_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
void stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
 Indicate to a subscription that we are interested in messages with one or more formatters. More...
 
int stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are interested in a message type. More...
 
void stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Stasis subscription callback function that does nothing. More...
 
int stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are not interested in a message type. More...
 
int stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg)
 Determine whether a message is the final message to be received on a subscription. More...
 
int stasis_subscription_is_done (struct stasis_subscription *subscription)
 Returns whether subscription has received its final message. More...
 
int stasis_subscription_is_subscribed (const struct stasis_subscription *sub)
 Returns whether a subscription is currently subscribed. More...
 
void stasis_subscription_join (struct stasis_subscription *subscription)
 Block until the last message is processed on a subscription. More...
 
int stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water)
 Set the high and low alert water marks of the stasis subscription. More...
 
int stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a subscription. More...
 
static struct stasis_subscription_statisticsstasis_subscription_statistics_create (struct stasis_subscription *sub, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
 
const char * stasis_subscription_uniqueid (const struct stasis_subscription *sub)
 Get the unique ID for the subscription. More...
 
struct stasis_topicstasis_topic_create (const char *name)
 Create a new topic. More...
 
struct stasis_topicstasis_topic_create_with_detail (const char *name, const char *detail)
 Create a new topic with given detail. More...
 
const char * stasis_topic_detail (const struct stasis_topic *topic)
 Return the detail of a topic. More...
 
struct stasis_topicstasis_topic_get (const char *name)
 Get a topic of the given name. More...
 
const char * stasis_topic_name (const struct stasis_topic *topic)
 Return the name of a topic. More...
 
struct stasis_topic_poolstasis_topic_pool_create (struct stasis_topic *pooled_topic)
 Create a topic pool that routes messages from dynamically generated topics to the given topic. More...
 
void stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Delete a topic from the topic pool. More...
 
struct stasis_topicstasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Find or create a topic in the pool. More...
 
int stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name)
 Check if a topic exists in a pool. More...
 
static struct stasis_topic_statistics * stasis_topic_statistics_create (struct stasis_topic *topic)
 
size_t stasis_topic_subscribers (const struct stasis_topic *topic)
 Return the number of subscribers of a topic. More...
 
struct stasis_subscriptionstasis_unsubscribe (struct stasis_subscription *sub)
 Cancel a subscription. More...
 
struct stasis_subscriptionstasis_unsubscribe_and_join (struct stasis_subscription *subscription)
 Cancel a subscription, blocking until the last message is processed. More...
 
static char * statistics_show_messages (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * statistics_show_subscription (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * statistics_show_subscriptions (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * statistics_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * statistics_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static int sub_cleanup (void *data)
 
static struct stasis_subscription_changesubscription_change_alloc (struct stasis_topic *topic, const char *uniqueid, const char *description)
 
static void subscription_change_dtor (void *obj)
 
static void subscription_dtor (void *obj)
 
static void subscription_invoke (struct stasis_subscription *sub, struct stasis_message *message)
 Invoke the subscription's callback. More...
 
static int subscription_statistics_cmp (void *obj, void *arg, int flags)
 
static char * subscription_statistics_complete_name (const char *word, int state)
 
static void subscription_statistics_destroy (void *obj)
 
static int subscription_statistics_hash (const void *obj, const int flags)
 
static int topic_add_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 Add a subscriber to a topic. More...
 
static char * topic_complete_name (const char *word)
 
static void topic_dtor (void *obj)
 
static void topic_pool_dtor (void *obj)
 
static struct topic_pool_entrytopic_pool_entry_alloc (const char *topic_name)
 
static int topic_pool_entry_cmp (void *obj, void *arg, int flags)
 
static void topic_pool_entry_dtor (void *obj)
 
static int topic_pool_entry_hash (const void *obj, const int flags)
 
static int topic_remove_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static int topic_statistics_cmp (void *obj, void *arg, int flags)
 
static char * topic_statistics_complete_name (const char *word, int state)
 
static void topic_statistics_destroy (void *obj)
 
static int topic_statistics_hash (const void *obj, const int flags)
 
static int userevent_exclusion_cb (const char *key)
 
 STASIS_MESSAGE_TYPE_DEFN (ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,)
 Define multi user event message type(s). More...
 

Variables

static struct ast_cli_entry cli_stasis []
 
static struct ast_cli_entry cli_stasis_statistics []
 
static struct aco_type declined_option
 An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type. More...
 
struct aco_typedeclined_options [] = ACO_TYPES(&declined_option)
 
static ast_mutex_t message_type_statistics_lock = { PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP , NULL, {1, 0} }
 
struct aco_file stasis_conf
 
static struct ast_threadpoolthreadpool
 
static struct aco_type threadpool_option
 
static struct aco_typethreadpool_options [] = ACO_TYPES(&threadpool_option)
 
struct ao2_containertopic_all
 

Detailed Description

Stasis Message Bus API.

Author
David M. Lee, II [email protected][email protected]digi[email protected]um.co[email protected]m

Definition in file stasis.c.

Macro Definition Documentation

◆ FMT_FIELDS [1/4]

#define FMT_FIELDS   "%-64s %-64s\n"

◆ FMT_FIELDS [2/4]

#define FMT_FIELDS   "%-64s %10d %10d %16ld %16ld\n"

◆ FMT_FIELDS [3/4]

#define FMT_FIELDS   "%-64s %10d %10d %10d %16ld %16ld\n"

◆ FMT_FIELDS [4/4]

#define FMT_FIELDS   "%-64s %10d %10d\n"

◆ FMT_FIELDS2 [1/2]

#define FMT_FIELDS2   "%-64s %10d %10d\n"

◆ FMT_FIELDS2 [2/2]

#define FMT_FIELDS2   "%-64s %10s %10d %10d\n"

◆ FMT_HEADERS [1/4]

#define FMT_HEADERS   "%-64s %-64s\n"

◆ FMT_HEADERS [2/4]

#define FMT_HEADERS   "%-64s %10s %10s %16s %16s\n"

◆ FMT_HEADERS [3/4]

#define FMT_HEADERS   "%-64s %10s %10s %10s %16s %16s\n"

◆ FMT_HEADERS [4/4]

#define FMT_HEADERS   "%-64s %10s %10s\n"

◆ INITIAL_SUBSCRIBERS_MAX

#define INITIAL_SUBSCRIBERS_MAX   4

Initial size of the subscribers list.

Definition at line 302 of file stasis.c.

Referenced by stasis_topic_create_with_detail().

◆ SUBSCRIPTION_STATISTICS_BUCKETS

#define SUBSCRIPTION_STATISTICS_BUCKETS   57

The number of buckets to use for subscription statistics

Definition at line 328 of file stasis.c.

Referenced by stasis_init().

◆ TOPIC_ALL_BUCKETS

#define TOPIC_ALL_BUCKETS   997

Definition at line 318 of file stasis.c.

Referenced by stasis_init().

◆ topic_lock_both

#define topic_lock_both (   topic1,
  topic2 
)

Lock two topics.

Definition at line 426 of file stasis.c.

Referenced by stasis_forward_all(), and stasis_forward_cancel().

◆ TOPIC_POOL_BUCKETS

#define TOPIC_POOL_BUCKETS   57

The number of buckets to use for topic pools

Definition at line 305 of file stasis.c.

Referenced by stasis_topic_pool_create().

◆ TOPIC_STATISTICS_BUCKETS

#define TOPIC_STATISTICS_BUCKETS   57

The number of buckets to use for topic statistics

Definition at line 325 of file stasis.c.

Referenced by stasis_init().

Function Documentation

◆ __stasis_subscribe()

struct stasis_subscription* __stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
Returns
New stasis_subscription object.
NULL on error.
Since
12
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 944 of file stasis.c.

References internal_stasis_subscribe().

Referenced by stasis_message_router_create_internal().

951 {
952  return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
953 }
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:858

◆ __stasis_subscribe_pool()

struct stasis_subscription* __stasis_subscribe_pool ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription whose callbacks occur on a thread pool.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

Unlike stasis_subscribe, this function will explicitly use a threadpool to dispatch items to its callback. This form of subscription should be used when many subscriptions may be made to the specified topic.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
Returns
New stasis_subscription object.
NULL on error.
Since
12.8.0
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 955 of file stasis.c.

References internal_stasis_subscribe().

Referenced by stasis_message_router_create_internal().

962 {
963  return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
964 }
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:858

◆ AO2_GLOBAL_OBJ_STATIC() [1/3]

static AO2_GLOBAL_OBJ_STATIC ( topic_statistics  )
static

Global container which stores statistics for topics

◆ AO2_GLOBAL_OBJ_STATIC() [2/3]

static AO2_GLOBAL_OBJ_STATIC ( subscription_statistics  )
static

Global container which stores statistics for subscriptions

◆ AO2_GLOBAL_OBJ_STATIC() [3/3]

static AO2_GLOBAL_OBJ_STATIC ( globals  )
static

A global object container that will contain the stasis_config that gets swapped out on reloads.

◆ AO2_STRING_FIELD_CASE_SORT_FN()

AO2_STRING_FIELD_CASE_SORT_FN ( topic_proxy  ,
name   
)

◆ AO2_STRING_FIELD_CMP_FN()

AO2_STRING_FIELD_CMP_FN ( topic_proxy  ,
name   
)

◆ AO2_STRING_FIELD_HASH_FN()

AO2_STRING_FIELD_HASH_FN ( topic_proxy  ,
name   
)

◆ AO2_STRING_FIELD_SORT_FN() [1/2]

AO2_STRING_FIELD_SORT_FN ( stasis_subscription_statistics  ,
uniqueid   
)

◆ AO2_STRING_FIELD_SORT_FN() [2/2]

AO2_STRING_FIELD_SORT_FN ( stasis_topic_statistics  ,
name   
)

◆ AST_VECTOR()

static AST_VECTOR ( struct stasis_message_type_statistics  )
static

Vector containing message type information

Highest time spent dispatching messages to subscribers

Lowest time spent dispatching messages to subscribers

The number of messages that were not dispatched to any subscriber

The number of messages that were dispatched to at least 1 subscriber

The ids of the subscribers to this topic

Pointer to the topic (NOT refcounted, and must NOT be accessed)

Name of the topic

Definition at line 350 of file stasis.c.

References name.

353  {
354  /*! \brief Highest time spent dispatching messages to subscribers */
355  long highest_time_dispatched;
356  /*! \brief Lowest time spent dispatching messages to subscribers */
357  long lowest_time_dispatched;
358  /*! \brief The number of messages that were not dispatched to any subscriber */
359  int messages_not_dispatched;
360  /*! \brief The number of messages that were dispatched to at least 1 subscriber */
361  int messages_dispatched;
362  /*! \brief The ids of the subscribers to this topic */
363  struct ao2_container *subscribers;
364  /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
365  struct stasis_topic *topic;
366  /*! \brief Name of the topic */
367  char name[0];
368 };
static const char name[]
Definition: cdr_mysql.c:74
Generic container type.

◆ CONFIG_INFO_CORE()

CONFIG_INFO_CORE ( "stasis"  ,
cfg_info  ,
globals  ,
stasis_config_alloc  ,
files = ACO_FILES(&stasis_conf) 
)

Register information about the configs being processed by this module.

◆ declined_handler()

static int declined_handler ( const struct aco_option opt,
struct ast_variable var,
void *  obj 
)
static

Definition at line 2304 of file stasis.c.

References ast_multi_user_event_type(), ast_str_container_add(), ast_strlen_zero, stasis_declined_config::declined, multi_user_event_to_ami(), multi_user_event_to_json(), STASIS_MESSAGE_TYPE_DEFN(), to_ami(), and ast_variable::value.

Referenced by stasis_init().

2305 {
2306  struct stasis_declined_config *declined = obj;
2307 
2308  if (ast_strlen_zero(var->value)) {
2309  return 0;
2310  }
2311 
2312  if (ast_str_container_add(declined->declined, var->value)) {
2313  return -1;
2314  }
2315 
2316  return 0;
2317 }
#define ast_strlen_zero(foo)
Definition: strings.h:52
struct ao2_container * declined
Definition: stasis.c:2182
A structure to hold global configuration-related options.
Definition: stasis.c:2180
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:206

◆ dispatch_exec_async()

static int dispatch_exec_async ( struct ast_taskprocessor_local local)
static

Definition at line 1261 of file stasis.c.

References ao2_cleanup, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, and subscription_invoke().

Referenced by dispatch_message().

1262 {
1263  struct stasis_subscription *sub = local->local_data;
1264  struct stasis_message *message = local->data;
1265 
1266  subscription_invoke(sub, message);
1267  ao2_cleanup(message);
1268 
1269  return 0;
1270 }
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:756
struct stasis_forward * sub
Definition: res_corosync.c:240

◆ dispatch_exec_sync()

static int dispatch_exec_sync ( struct ast_taskprocessor_local local)
static

Definition at line 1288 of file stasis.c.

References ao2_cleanup, ast_cond_signal, ast_mutex_lock, ast_mutex_unlock, sync_task_data::complete, sync_task_data::cond, ast_taskprocessor_local::data, ast_taskprocessor_local::local_data, sync_task_data::lock, subscription_invoke(), and sync_task_data::task_data.

Referenced by dispatch_message().

1289 {
1290  struct stasis_subscription *sub = local->local_data;
1291  struct sync_task_data *std = local->data;
1292  struct stasis_message *message = std->task_data;
1293 
1294  subscription_invoke(sub, message);
1295  ao2_cleanup(message);
1296 
1297  ast_mutex_lock(&std->lock);
1298  std->complete = 1;
1299  ast_cond_signal(&std->cond);
1300  ast_mutex_unlock(&std->lock);
1301 
1302  return 0;
1303 }
#define ast_mutex_lock(a)
Definition: lock.h:187
#define ast_cond_signal(cond)
Definition: lock.h:201
ast_cond_t cond
Definition: stasis.c:1278
ast_mutex_t lock
Definition: stasis.c:1277
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:756
struct stasis_forward * sub
Definition: res_corosync.c:240
void * task_data
Definition: stasis.c:1280
#define ast_mutex_unlock(a)
Definition: lock.h:188

◆ dispatch_message()

static unsigned int dispatch_message ( struct stasis_subscription sub,
struct stasis_message message,
int  synchronous 
)
static

Definition at line 1314 of file stasis.c.

References ao2_bump, ao2_cleanup, ast_atomic_fetchadd_int(), ast_cond_destroy, ast_cond_init, ast_cond_wait, ast_log, ast_mutex_destroy, ast_mutex_init, ast_mutex_lock, ast_mutex_unlock, ast_taskprocessor_push_local(), AST_VECTOR_GET, AST_VECTOR_SIZE, sync_task_data::complete, sync_task_data::cond, dispatch_exec_async(), dispatch_exec_sync(), sync_task_data::lock, LOG_ERROR, stasis_subscription::mailbox, stasis_message_type_statistics::message_type, NULL, stasis_message_type(), stasis_message_type_available_formatters(), stasis_message_type_id(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), STASIS_SUBSCRIPTION_FORMATTER_NONE, subscription_invoke(), and sync_task_data::task_data.

Referenced by publish_msg(), and send_subscription_unsubscribe().

1317 {
1318  int is_final = stasis_subscription_final_message(sub, message);
1319 
1320  /*
1321  * The 'do while' gives us an easy way to skip remaining logic once
1322  * we determine the message should be accepted.
1323  * The code looks more verbose than it needs to be but it optimizes
1324  * down very nicely. It's just easier to understand and debug this way.
1325  */
1326  do {
1327  struct stasis_message_type *message_type = stasis_message_type(message);
1328  int type_id = stasis_message_type_id(message_type);
1329  int type_filter_specified = 0;
1330  int formatter_filter_specified = 0;
1331  int type_filter_passed = 0;
1332  int formatter_filter_passed = 0;
1333 
1334  /* We always accept final messages so only run the filter logic if not final */
1335  if (is_final) {
1336  break;
1337  }
1338 
1339  type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1340  formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1341 
1342  /* Accept if no filters of either type were specified */
1343  if (!type_filter_specified && !formatter_filter_specified) {
1344  break;
1345  }
1346 
1347  type_filter_passed = type_filter_specified
1348  && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1349  && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1350 
1351  /*
1352  * Since the type and formatter filters are OR'd, we can skip
1353  * the formatter check if the type check passes.
1354  */
1355  if (type_filter_passed) {
1356  break;
1357  }
1358 
1359  formatter_filter_passed = formatter_filter_specified
1360  && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
1361 
1362  if (formatter_filter_passed) {
1363  break;
1364  }
1365 
1366 #ifdef AST_DEVMODE
1367  ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1368 #endif
1369 
1370  return 0;
1371 
1372  } while (0);
1373 
1374 #ifdef AST_DEVMODE
1375  ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1376 #endif
1377 
1378  if (!sub->mailbox) {
1379  /* Dispatch directly */
1380  subscription_invoke(sub, message);
1381  return 1;
1382  }
1383 
1384  /* Bump the message for the taskprocessor push. This will get de-ref'd
1385  * by the task processor callback.
1386  */
1387  ao2_bump(message);
1388  if (!synchronous) {
1390  /* Push failed; ugh. */
1391  ast_log(LOG_ERROR, "Dropping async dispatch\n");
1392  ao2_cleanup(message);
1393  return 0;
1394  }
1395  } else {
1396  struct sync_task_data std;
1397 
1398  ast_mutex_init(&std.lock);
1399  ast_cond_init(&std.cond, NULL);
1400  std.complete = 0;
1401  std.task_data = message;
1402 
1404  /* Push failed; ugh. */
1405  ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1406  ao2_cleanup(message);
1407  ast_mutex_destroy(&std.lock);
1408  ast_cond_destroy(&std.cond);
1409  return 0;
1410  }
1411 
1412  ast_mutex_lock(&std.lock);
1413  while (!std.complete) {
1414  ast_cond_wait(&std.cond, &std.lock);
1415  }
1416  ast_mutex_unlock(&std.lock);
1417 
1418  ast_mutex_destroy(&std.lock);
1419  ast_cond_destroy(&std.cond);
1420  }
1421 
1422  return 1;
1423 }
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define ast_mutex_lock(a)
Definition: lock.h:187
#define NULL
Definition: resample.c:96
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
#define ao2_bump(obj)
Definition: astobj2.h:491
#define ast_log
Definition: astobj2.c:42
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
#define LOG_ERROR
Definition: logger.h:285
struct ast_taskprocessor * mailbox
Definition: stasis.c:687
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ast_cond_destroy(cond)
Definition: lock.h:200
static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
Definition: stasis.c:1288
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription&#39;s callback.
Definition: stasis.c:756
#define ast_mutex_init(pmutex)
Definition: lock.h:184
#define ast_mutex_destroy(a)
Definition: lock.h:186
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define ast_mutex_unlock(a)
Definition: lock.h:188
static int dispatch_exec_async(struct ast_taskprocessor_local *local)
Definition: stasis.c:1261

◆ forward_dtor()

static void forward_dtor ( void *  obj)
static

Definition at line 1538 of file stasis.c.

References ao2_cleanup, stasis_forward::from_topic, NULL, and stasis_forward::to_topic.

Referenced by stasis_forward_all().

1539 {
1540  struct stasis_forward *forward = obj;
1541 
1542  ao2_cleanup(forward->from_topic);
1543  forward->from_topic = NULL;
1544  ao2_cleanup(forward->to_topic);
1545  forward->to_topic = NULL;
1546 }
#define NULL
Definition: resample.c:96
struct stasis_topic * from_topic
Definition: stasis.c:1533
struct stasis_topic * to_topic
Definition: stasis.c:1535
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Forwarding information.
Definition: stasis.c:1531

◆ internal_stasis_subscribe()

struct stasis_subscription* internal_stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
int  needs_mailbox,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Note: modules outside of Stasis should use stasis_subscribe.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
needs_mailboxDetermines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread.
use_thread_poolUse the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero.
Returns
New stasis_subscription object.
NULL on error.
Since
12

Definition at line 858 of file stasis.c.

References ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_cond_init, ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), ast_threadpool_serializer(), AST_VECTOR_INIT, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::join_cond, stasis_subscription::mailbox, NULL, send_subscription_subscribe(), STASIS_SUBSCRIPTION_FILTER_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_subscription_statistics_create(), stasis_topic_name(), statistics(), sub, subscription_dtor(), stasis_subscription::topic, topic_add_subscription(), TPS_REF_DEFAULT, and stasis_subscription::uniqueid.

Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().

867 {
868  struct stasis_subscription *sub;
869  int ret;
870 
871  if (!topic) {
872  return NULL;
873  }
874 
875  /* The ao2 lock is used for join_cond. */
876  sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
877  if (!sub) {
878  return NULL;
879  }
880 
881 #ifdef AST_DEVMODE
882  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
883  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
884  if (ret < 0 || !sub->statistics) {
885  ao2_ref(sub, -1);
886  return NULL;
887  }
888 #else
889  ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
890  if (ret < 0) {
891  ao2_ref(sub, -1);
892  return NULL;
893  }
894 #endif
895 
896  if (needs_mailbox) {
897  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
898 
899  /* Create name with seq number appended. */
900  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
901  use_thread_pool ? 'p' : 'm',
902  stasis_topic_name(topic));
903 
904  /*
905  * With a small number of subscribers, a thread-per-sub is
906  * acceptable. For a large number of subscribers, a thread
907  * pool should be used.
908  */
909  if (use_thread_pool) {
910  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
911  } else {
913  }
914  if (!sub->mailbox) {
915  ao2_ref(sub, -1);
916 
917  return NULL;
918  }
920  /* Taskprocessor has a reference */
921  ao2_ref(sub, +1);
922  }
923 
924  ao2_ref(topic, +1);
925  sub->topic = topic;
926  sub->callback = callback;
927  sub->data = data;
928  ast_cond_init(&sub->join_cond, NULL);
929  sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
930  AST_VECTOR_INIT(&sub->accepted_message_types, 0);
931  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
932 
933  if (topic_add_subscription(topic, sub) != 0) {
934  ao2_ref(sub, -1);
935  ao2_ref(topic, -1);
936 
937  return NULL;
938  }
939  send_subscription_subscribe(topic, sub);
940 
941  return sub;
942 }
static void statistics(void)
Definition: utils/frame.c:287
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct stasis_topic * topic
Definition: stasis.c:685
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1432
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
ast_cond_t join_cond
Definition: stasis.c:694
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define NULL
Definition: resample.c:96
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:60
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1648
#define ao2_ref(o, delta)
Definition: astobj2.h:464
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
struct ast_taskprocessor * mailbox
Definition: stasis.c:687
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1203
stasis_subscription_cb callback
Definition: stasis.c:689
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
static struct stasis_subscription_statistics * stasis_subscription_statistics_create(struct stasis_subscription *sub, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Definition: stasis.c:823
struct stasis_forward * sub
Definition: res_corosync.c:240
static void subscription_dtor(void *obj)
Definition: stasis.c:715
static struct ast_threadpool * threadpool
Definition: stasis.c:308

◆ link_topic_proxy()

static int link_topic_proxy ( struct stasis_topic topic,
const char *  name,
const char *  detail 
)
static

Definition at line 501 of file stasis.c.

References ao2_bump, ao2_cleanup, ao2_link_flags, ao2_ref, ao2_t_weakproxy_alloc, ao2_t_weakproxy_set_object, ao2_unlock, ao2_weakproxy_subscribe(), ao2_wrlock, ast_copy_string(), ast_log, ast_tvnow(), topic_proxy::buf, topic_proxy::creationtime, topic_proxy::detail, LOG_ERROR, topic_proxy::name, NULL, OBJ_NOLOCK, proxy_dtor(), and stasis_topic_get().

Referenced by stasis_topic_create_with_detail().

502 {
503  struct topic_proxy *proxy;
504  struct stasis_topic* topic_tmp;
505  size_t detail_len;
506 
507  if (!topic || !name || !strlen(name) || !detail) {
508  return -1;
509  }
510 
512 
513  topic_tmp = stasis_topic_get(name);
514  if (topic_tmp) {
515  ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
516  ao2_ref(topic_tmp, -1);
518 
519  return -1;
520  }
521 
522  detail_len = strlen(detail) + 1;
523 
524  proxy = ao2_t_weakproxy_alloc(
525  sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
526  if (!proxy) {
528 
529  return -1;
530  }
531 
532  /* set the proxy info */
533  proxy->name = proxy->buf;
534  proxy->detail = proxy->name + strlen(name) + 1;
535 
536  strcpy(proxy->name, name); /* SAFE */
537  ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
538  proxy->creationtime = ast_tvnow();
539 
540  /* We have exclusive access to proxy, no need for locking here. */
541  if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
542  ao2_cleanup(proxy);
544 
545  return -1;
546  }
547 
549  ao2_cleanup(proxy);
552 
553  return -1;
554  }
555 
556  /* setting the topic point to the proxy */
557  topic->name = proxy->name;
558  topic->detail = proxy->detail;
559  topic->creationtime = &(proxy->creationtime);
560 
562  ao2_ref(proxy, -1);
563 
565 
566  return 0;
567 }
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
char * detail
Definition: stasis.c:402
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:931
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_wrlock(a)
Definition: astobj2.h:720
char * name
Definition: stasis.c:401
#define ao2_bump(obj)
Definition: astobj2.h:491
#define ast_log
Definition: astobj2.c:42
char buf[0]
Definition: stasis.c:406
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct timeval creationtime
Definition: stasis.c:404
#define LOG_ERROR
Definition: logger.h:285
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:586
struct ao2_container * topic_all
Definition: stasis.c:396
static const char name[]
Definition: cdr_mysql.c:74
static void proxy_dtor(void *weakproxy, void *container)
Definition: stasis.c:413
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:557
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:623

◆ multi_object_blob_dtor()

static void multi_object_blob_dtor ( void *  obj)
static

Definition at line 1959 of file stasis.c.

References ao2_cleanup, ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, STASIS_UMOS_MAX, and type.

Referenced by ast_multi_object_blob_create().

1960 {
1961  struct ast_multi_object_blob *multi = obj;
1962  int type;
1963  int i;
1964 
1965  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1966  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1967  ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1968  }
1969  AST_VECTOR_FREE(&multi->snapshots[type]);
1970  }
1971  ast_json_unref(multi->blob);
1972 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
static const char type[]
Definition: chan_ooh323.c:109
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1364
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct ast_json * blob
Definition: stasis.c:1951
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ multi_object_blob_to_ami()

static struct ast_str* multi_object_blob_to_ami ( void *  obj)
static

Definition at line 2096 of file stasis.c.

References ast_asprintf, ast_free, ast_manager_build_bridge_state_string_prefix(), ast_manager_build_channel_state_string_prefix(), ast_str_append(), ast_str_buffer(), ast_str_create, AST_VECTOR_GET, AST_VECTOR_SIZE, name, NULL, STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.

Referenced by multi_user_event_to_ami().

2097 {
2098  struct ast_str *ami_str=ast_str_create(1024);
2099  struct ast_str *ami_snapshot;
2100  const struct ast_multi_object_blob *multi = obj;
2102  int i;
2103 
2104  if (!ami_str) {
2105  return NULL;
2106  }
2107  if (!multi) {
2108  ast_free(ami_str);
2109  return NULL;
2110  }
2111 
2112  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2113  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2114  char *name = NULL;
2115  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2116  ami_snapshot = NULL;
2117 
2118  if (i > 0) {
2119  ast_asprintf(&name, "%d", i + 1);
2120  }
2121 
2122  switch (type) {
2123  case STASIS_UMOS_CHANNEL:
2124  ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2125  break;
2126 
2127  case STASIS_UMOS_BRIDGE:
2128  ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2129  break;
2130 
2131  case STASIS_UMOS_ENDPOINT:
2132  /* currently not sending endpoint snapshots to AMI */
2133  break;
2134  }
2135  if (ami_snapshot) {
2136  ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2137  ast_free(ami_snapshot);
2138  }
2139  ast_free(name);
2140  }
2141  }
2142 
2143  return ami_str;
2144 }
static const char type[]
Definition: chan_ooh323.c:109
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1364
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:714
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1091
#define NULL
Definition: resample.c:96
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1357
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
Definition: strings.h:584
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define ast_str_create(init_len)
Create a malloc&#39;ed dynamic length string.
Definition: strings.h:620

◆ multi_user_event_to_ami()

static struct ast_manager_event_blob* multi_user_event_to_ami ( struct stasis_message message)
static

Definition at line 2155 of file stasis.c.

References ast_free, ast_json_object_get(), ast_json_string_get(), ast_manager_event_blob_create(), ast_manager_str_from_json_object(), ast_str_buffer(), ast_multi_object_blob::blob, EVENT_FLAG_USER, multi_object_blob_to_ami(), NULL, RAII_VAR, stasis_message_data(), and userevent_exclusion_cb().

Referenced by declined_handler().

2157 {
2158  RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2159  RAII_VAR(struct ast_str *, body, NULL, ast_free);
2160  struct ast_multi_object_blob *multi = stasis_message_data(message);
2161  const char *eventname;
2162 
2163  eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2165  object_string = multi_object_blob_to_ami(multi);
2166  if (!object_string || !body) {
2167  return NULL;
2168  }
2169 
2170  return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
2171  "%s"
2172  "UserEvent: %s\r\n"
2173  "%s",
2174  ast_str_buffer(object_string),
2175  eventname,
2176  ast_str_buffer(body));
2177 }
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:714
#define NULL
Definition: resample.c:96
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:9727
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:1821
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:273
static int userevent_exclusion_cb(const char *key)
Definition: stasis.c:2147
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
Definition: strings.h:584
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define EVENT_FLAG_USER
Definition: manager.h:77
static struct ast_str * multi_object_blob_to_ami(void *obj)
Definition: stasis.c:2096
#define ast_free(a)
Definition: astmm.h:182
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
struct ast_json * blob
Definition: stasis.c:1951

◆ multi_user_event_to_json()

static struct ast_json* multi_user_event_to_json ( struct stasis_message message,
const struct stasis_message_sanitizer sanitize 
)
static

Definition at line 2045 of file stasis.c.

References ast_bridge_snapshot_to_json(), ast_channel_snapshot_to_json(), ast_endpoint_snapshot_to_json(), ast_json_object_create(), ast_json_object_get(), ast_json_object_set(), ast_json_ref(), ast_json_string_create(), ast_json_timeval(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_multi_object_blob::blob, name, NULL, out, stasis_message_data(), stasis_message_timestamp(), STASIS_UMOS_BRIDGE, STASIS_UMOS_CHANNEL, STASIS_UMOS_ENDPOINT, STASIS_UMOS_MAX, and type.

Referenced by declined_handler().

2048 {
2049  struct ast_json *out;
2050  struct ast_multi_object_blob *multi = stasis_message_data(message);
2051  struct ast_json *blob = multi->blob;
2052  const struct timeval *tv = stasis_message_timestamp(message);
2054  int i;
2055 
2056  out = ast_json_object_create();
2057  if (!out) {
2058  return NULL;
2059  }
2060 
2061  ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2062  ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2063  ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2064  ast_json_object_set(out, "userevent", ast_json_ref(blob));
2065 
2066  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2067  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2068  struct ast_json *json_object = NULL;
2069  char *name = NULL;
2070  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2071 
2072  switch (type) {
2073  case STASIS_UMOS_CHANNEL:
2074  json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2075  name = "channel";
2076  break;
2077  case STASIS_UMOS_BRIDGE:
2078  json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2079  name = "bridge";
2080  break;
2081  case STASIS_UMOS_ENDPOINT:
2082  json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2083  name = "endpoint";
2084  break;
2085  }
2086  if (json_object) {
2087  ast_json_object_set(out, name, json_object);
2088  }
2089  }
2090  }
2091 
2092  return out;
2093 }
static const char type[]
Definition: chan_ooh323.c:109
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1364
#define NULL
Definition: resample.c:96
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:404
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1357
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:649
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
static const char name[]
Definition: cdr_mysql.c:74
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:389
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
FILE * out
Definition: utils/frame.c:33
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
Abstract JSON element (object, array, string, int, ...).
struct ast_json * blob
Definition: stasis.c:1951
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ proxy_dtor()

static void proxy_dtor ( void *  weakproxy,
void *  container 
)
static

Definition at line 413 of file stasis.c.

References ao2_cleanup, ao2_unlink, sub, topic_add_subscription(), and topic_remove_subscription().

Referenced by link_topic_proxy().

414 {
415  ao2_unlink(container, weakproxy);
417 }
struct ao2_container * container
Definition: res_fax.c:502
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ publish_msg()

static void publish_msg ( struct stasis_topic topic,
struct stasis_message message,
struct stasis_subscription sync_sub 
)
static

Definition at line 1432 of file stasis.c.

References ao2_lock, ao2_ref, ao2_unlock, ast_assert, ast_atomic_fetchadd_int(), ast_mutex_lock, ast_mutex_unlock, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_GET_ADDR, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, dispatch_message(), stasis_message_type_statistics::message_type, message_type_id, message_type_statistics_lock, NULL, stasis_message_type_statistics::published, stasis_message_type(), stasis_message_type_id(), stasis_topic_subscribers(), statistics(), and stasis_message_type_statistics::unused.

Referenced by stasis_publish(), and stasis_publish_sync().

1434 {
1435  size_t i;
1436  unsigned int dispatched = 0;
1437 #ifdef AST_DEVMODE
1440  struct timeval start;
1441  long elapsed;
1442 #endif
1443 
1444  ast_assert(topic != NULL);
1445  ast_assert(message != NULL);
1446 
1447 #ifdef AST_DEVMODE
1449  if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1450  struct stasis_message_type_statistics new_statistics = {
1451  .published = 0,
1452  };
1453  if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1455  return;
1456  }
1457  }
1458  statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1459  statistics->message_type = stasis_message_type(message);
1461 
1462  ast_atomic_fetchadd_int(&statistics->published, +1);
1463 #endif
1464 
1465  /* If there are no subscribers don't bother */
1466  if (!stasis_topic_subscribers(topic)) {
1467 #ifdef AST_DEVMODE
1468  ast_atomic_fetchadd_int(&statistics->unused, +1);
1469  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1470 #endif
1471  return;
1472  }
1473 
1474  /*
1475  * The topic may be unref'ed by the subscription invocation.
1476  * Make sure we hold onto a reference while dispatching.
1477  */
1478  ao2_ref(topic, +1);
1479 #ifdef AST_DEVMODE
1480  start = ast_tvnow();
1481 #endif
1482  ao2_lock(topic);
1483  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1484  struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
1485 
1486  ast_assert(sub != NULL);
1487 
1488  dispatched += dispatch_message(sub, message, (sub == sync_sub));
1489  }
1490  ao2_unlock(topic);
1491 
1492 #ifdef AST_DEVMODE
1493  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1494  if (elapsed > topic->statistics->highest_time_dispatched) {
1495  topic->statistics->highest_time_dispatched = elapsed;
1496  }
1497  if (elapsed < topic->statistics->lowest_time_dispatched) {
1498  topic->statistics->lowest_time_dispatched = elapsed;
1499  }
1500  if (dispatched) {
1501  ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1502  } else {
1503  ast_atomic_fetchadd_int(&statistics->unused, +1);
1504  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1505  }
1506 #endif
1507 
1508  ao2_ref(topic, -1);
1509 }
static void statistics(void)
Definition: utils/frame.c:287
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
static int message_type_id
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ast_assert(a)
Definition: utils.h:695
#define ast_mutex_lock(a)
Definition: lock.h:187
#define ao2_unlock(a)
Definition: astobj2.h:730
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:98
#define NULL
Definition: resample.c:96
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
int unused
The number of messages of this that did not reach a subscriber.
Definition: stasis.c:341
static ast_mutex_t message_type_statistics_lock
Definition: stasis.c:347
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1314
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
int published
The number of messages of this published.
Definition: stasis.c:339
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
struct stasis_forward * sub
Definition: res_corosync.c:240
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define ast_mutex_unlock(a)
Definition: lock.h:188
struct stasis_message_type * message_type
The stasis message type.
Definition: stasis.c:343

◆ send_subscription_subscribe()

static void send_subscription_subscribe ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1648 of file stasis.c.

References ao2_cleanup, ast_assert, stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), subscription_change_alloc(), and stasis_subscription::uniqueid.

Referenced by internal_stasis_subscribe(), and subscription_invoke().

1649 {
1650  struct stasis_subscription_change *change;
1651  struct stasis_message *msg;
1652 
1653  /* This assumes that we have already unsubscribed */
1655 
1657  return;
1658  }
1659 
1660  change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1661  if (!change) {
1662  return;
1663  }
1664 
1666  if (!msg) {
1667  ao2_cleanup(change);
1668  return;
1669  }
1670 
1671  stasis_publish(topic, msg);
1672  ao2_cleanup(msg);
1673  ao2_cleanup(change);
1674 }
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1627
#define ast_assert(a)
Definition: utils.h:695
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152

◆ send_subscription_unsubscribe()

static void send_subscription_unsubscribe ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Definition at line 1676 of file stasis.c.

References ao2_cleanup, ast_assert, dispatch_message(), stasis_message_create(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_is_subscribed(), subscription_change_alloc(), and stasis_subscription::uniqueid.

Referenced by stasis_unsubscribe(), and subscription_invoke().

1678 {
1679  struct stasis_subscription_change *change;
1680  struct stasis_message *msg;
1681 
1682  /* This assumes that we have already unsubscribed */
1684 
1686  return;
1687  }
1688 
1689  change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1690  if (!change) {
1691  return;
1692  }
1693 
1695  if (!msg) {
1696  ao2_cleanup(change);
1697  return;
1698  }
1699 
1700  stasis_publish(topic, msg);
1701 
1702  /* Now we have to dispatch to the subscription itself */
1703  dispatch_message(sub, msg, 0);
1704 
1705  ao2_cleanup(msg);
1706  ao2_cleanup(change);
1707 }
static struct stasis_subscription_change * subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
Definition: stasis.c:1627
#define ast_assert(a)
Definition: utils.h:695
static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
Definition: stasis.c:1314
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152

◆ stasis_cleanup()

static void stasis_cleanup ( void  )
static

Cleanup function for graceful shutdowns.

Definition at line 3042 of file stasis.c.

References aco_info_destroy(), ao2_cleanup, ao2_global_obj_release, ARRAY_LEN, ast_cli_unregister_multiple(), ast_multi_user_event_type(), ast_threadpool_shutdown(), AST_VECTOR_FREE, globals, NULL, STASIS_MESSAGE_TYPE_CLEANUP, and stasis_subscription_change_type().

Referenced by stasis_init().

3043 {
3044 #ifdef AST_DEVMODE
3046  AST_VECTOR_FREE(&message_type_statistics);
3047  ao2_global_obj_release(subscription_statistics);
3048  ao2_global_obj_release(topic_statistics);
3049 #endif
3052  topic_all = NULL;
3054  threadpool = NULL;
3057  aco_info_destroy(&cfg_info);
3059 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
static struct ast_cli_entry cli_stasis_statistics[]
Definition: stasis.c:2916
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
#define NULL
Definition: resample.c:96
static struct console_pvt globals
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2476
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
#define ao2_global_obj_release(holder)
Definition: astobj2.h:865
struct ao2_container * topic_all
Definition: stasis.c:396
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:965
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
static struct ast_threadpool * threadpool
Definition: stasis.c:308

◆ stasis_config_alloc()

static void * stasis_config_alloc ( void  )
static

Definition at line 2253 of file stasis.c.

References ao2_alloc, ao2_ref, ast_calloc, ast_str_container_alloc, stasis_declined_config::declined, stasis_config::declined_message_types, NULL, stasis_config_destructor(), stasis_declined_config_destructor(), and stasis_config::threadpool_options.

Referenced by stasis_init().

2254 {
2255  struct stasis_config *cfg;
2256 
2257  if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2258  return NULL;
2259  }
2260 
2261  cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2262  if (!cfg->threadpool_options) {
2263  ao2_ref(cfg, -1);
2264  return NULL;
2265  }
2266 
2269  if (!cfg->declined_message_types) {
2270  ao2_ref(cfg, -1);
2271  return NULL;
2272  }
2273 
2275  if (!cfg->declined_message_types->declined) {
2276  ao2_ref(cfg, -1);
2277  return NULL;
2278  }
2279 
2280  return cfg;
2281 }
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1312
static void stasis_declined_config_destructor(void *obj)
Definition: stasis.c:2238
#define NULL
Definition: resample.c:96
struct ao2_container * declined
Definition: stasis.c:2182
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:204
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2197
static void stasis_config_destructor(void *obj)
Definition: stasis.c:2245

◆ stasis_config_destructor()

static void stasis_config_destructor ( void *  obj)
static

Definition at line 2245 of file stasis.c.

References ao2_cleanup, ast_free, stasis_config::declined_message_types, and stasis_config::threadpool_options.

Referenced by stasis_config_alloc().

2246 {
2247  struct stasis_config *cfg = obj;
2248 
2251 }
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
#define ast_free(a)
Definition: astmm.h:182
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2197

◆ stasis_declined_config_destructor()

static void stasis_declined_config_destructor ( void *  obj)
static

Definition at line 2238 of file stasis.c.

References ao2_cleanup, and stasis_declined_config::declined.

Referenced by stasis_config_alloc().

2239 {
2240  struct stasis_declined_config *declined = obj;
2241 
2242  ao2_cleanup(declined->declined);
2243 }
struct ao2_container * declined
Definition: stasis.c:2182
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
A structure to hold global configuration-related options.
Definition: stasis.c:2180

◆ stasis_forward_all()

struct stasis_forward* stasis_forward_all ( struct stasis_topic from_topic,
struct stasis_topic to_topic 
)

Create a subscription which forwards all messages from one topic to another.

Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.

Parameters
from_topicTopic to forward.
to_topicDestination topic of forwarded messages.
Returns
New forwarding subscription.
NULL on error.
Since
12

Definition at line 1578 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_ref, ao2_unlock, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, forward_dtor(), stasis_forward::from_topic, NULL, stasis_forward::to_topic, topic_add_subscription(), and topic_lock_both.

Referenced by __init_manager(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), ast_channel_internal_setup_topics(), AST_TEST_DEFINE(), create_subscriptions(), endpoint_internal_create(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), stasis_topic_pool_get_topic(), and state_alloc().

1580 {
1581  int res;
1582  size_t idx;
1583  struct stasis_forward *forward;
1584 
1585  if (!from_topic || !to_topic) {
1586  return NULL;
1587  }
1588 
1589  forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1590  if (!forward) {
1591  return NULL;
1592  }
1593 
1594  /* Forwards to ourselves are implicit. */
1595  if (to_topic == from_topic) {
1596  return forward;
1597  }
1598 
1599  forward->from_topic = ao2_bump(from_topic);
1600  forward->to_topic = ao2_bump(to_topic);
1601 
1602  topic_lock_both(to_topic, from_topic);
1603  res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1604  if (res != 0) {
1605  ao2_unlock(from_topic);
1606  ao2_unlock(to_topic);
1607  ao2_ref(forward, -1);
1608  return NULL;
1609  }
1610 
1611  for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1612  topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1613  }
1614  ao2_unlock(from_topic);
1615  ao2_unlock(to_topic);
1616 
1617  return forward;
1618 }
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
static void forward_dtor(void *obj)
Definition: stasis.c:1538
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:426
struct stasis_topic * from_topic
Definition: stasis.c:1533
#define ao2_bump(obj)
Definition: astobj2.h:491
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * to_topic
Definition: stasis.c:1535
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1203
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
Forwarding information.
Definition: stasis.c:1531
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_forward_cancel()

struct stasis_forward* stasis_forward_cancel ( struct stasis_forward forward)

Definition at line 1548 of file stasis.c.

References ao2_cleanup, ao2_unlock, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_forward::from_topic, NULL, stasis_forward::to_topic, topic_lock_both, and topic_remove_subscription().

Referenced by all_dtor(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_internal_cleanup(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), bridge_channel_control_thread(), cleanup_module(), destroy_subscriptions(), forwards_unsubscribe(), load_general_config(), load_module(), manager_bridging_cleanup(), manager_channels_shutdown(), manager_mwi_shutdown(), manager_shutdown(), manager_system_shutdown(), stasis_cp_single_unsubscribe(), state_dtor(), topic_pool_entry_dtor(), and unload_module().

1549 {
1550  int idx;
1551  struct stasis_topic *from;
1552  struct stasis_topic *to;
1553 
1554  if (!forward) {
1555  return NULL;
1556  }
1557 
1558  from = forward->from_topic;
1559  to = forward->to_topic;
1560 
1561  if (from && to) {
1562  topic_lock_both(to, from);
1563  AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
1565 
1566  for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1567  topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1568  }
1569  ao2_unlock(from);
1570  ao2_unlock(to);
1571  }
1572 
1573  ao2_cleanup(forward);
1574 
1575  return NULL;
1576 }
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1231
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:585
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:426
struct stasis_topic * from_topic
Definition: stasis.c:1533
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:573
struct stasis_topic * to_topic
Definition: stasis.c:1535
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_init()

int stasis_init ( void  )

Initialize the Stasis subsystem.

Returns
0 on success.
Non-zero on error.
Since
12

Definition at line 3061 of file stasis.c.

References ACO_EXACT, aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_cleanup, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ARRAY_LEN, ast_cli_register_multiple, ast_log, ast_multi_user_event_type(), ast_register_cleanup(), ast_threadpool_create(), AST_THREADPOOL_OPTIONS_VERSION, AST_VECTOR_INIT, ast_threadpool_options::auto_increment, declined_handler(), stasis_config::declined_message_types, FLDSET, globals, ast_threadpool_options::idle_timeout, stasis_threadpool_conf::idle_timeout_sec, ast_threadpool_options::initial_size, stasis_threadpool_conf::initial_size, LOG_ERROR, LOG_NOTICE, ast_threadpool_options::max_size, stasis_threadpool_conf::max_size, NULL, OPT_INT_T, PARSE_IN_RANGE, stasis_cache_init(), stasis_cleanup(), stasis_config_alloc(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), SUBSCRIPTION_STATISTICS_BUCKETS, subscription_statistics_cmp(), subscription_statistics_hash(), stasis_config::threadpool_options, TOPIC_ALL_BUCKETS, TOPIC_STATISTICS_BUCKETS, topic_statistics_cmp(), topic_statistics_hash(), and ast_threadpool_options::version.

Referenced by asterisk_daemon().

3062 {
3063  struct stasis_config *cfg;
3064  int cache_init;
3065  struct ast_threadpool_options threadpool_opts = { 0, };
3066 #ifdef AST_DEVMODE
3067  struct ao2_container *subscription_stats;
3068  struct ao2_container *topic_stats;
3069 #endif
3070 
3071  /* Be sure the types are cleaned up after the message bus */
3073 
3074  if (aco_info_init(&cfg_info)) {
3075  return -1;
3076  }
3077 
3078  aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3080  aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3082  FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3083  INT_MAX);
3084  aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3086  FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3087  INT_MAX);
3088  aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3090  FLDSET(struct stasis_threadpool_conf, max_size), 0,
3091  INT_MAX);
3092 
3093  if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3094  struct stasis_config *default_cfg = stasis_config_alloc();
3095 
3096  if (!default_cfg) {
3097  return -1;
3098  }
3099 
3100  if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3101  ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3102  ao2_ref(default_cfg, -1);
3103 
3104  return -1;
3105  }
3106 
3107  if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3108  ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3109  ao2_ref(default_cfg, -1);
3110 
3111  return -1;
3112  }
3113 
3114  ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3115  ao2_global_obj_replace_unref(globals, default_cfg);
3116  cfg = default_cfg;
3117  } else {
3118  cfg = ao2_global_obj_ref(globals);
3119  if (!cfg) {
3120  ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3121 
3122  return -1;
3123  }
3124  }
3125 
3126  threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3127  threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3128  threadpool_opts.auto_increment = 1;
3129  threadpool_opts.max_size = cfg->threadpool_options->max_size;
3130  threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3131  threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3132  ao2_ref(cfg, -1);
3133  if (!threadpool) {
3134  ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3135 
3136  return -1;
3137  }
3138 
3139  cache_init = stasis_cache_init();
3140  if (cache_init != 0) {
3141  return -1;
3142  }
3143 
3145  return -1;
3146  }
3148  return -1;
3149  }
3150 
3152  topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3153  if (!topic_all) {
3154  return -1;
3155  }
3156 
3158  return -1;
3159  }
3160 
3161 #ifdef AST_DEVMODE
3162  /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3163  * topic or subscripton.
3164  */
3167  if (!subscription_stats) {
3168  return -1;
3169  }
3170  ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3171  ao2_cleanup(subscription_stats);
3172 
3175  if (!topic_stats) {
3176  return -1;
3177  }
3178  ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3179  ao2_cleanup(topic_stats);
3180  if (!topic_stats) {
3181  return -1;
3182  }
3183 
3184  AST_VECTOR_INIT(&message_type_statistics, 0);
3185 
3187  return -1;
3188  }
3189 #endif
3190 
3191  return 0;
3192 }
static struct aco_type threadpool_option
Definition: stasis.c:2202
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3042
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
static struct ast_cli_entry cli_stasis_statistics[]
Definition: stasis.c:2916
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
Threadpool configuration options.
Definition: stasis.c:2186
struct aco_type * declined_options[]
Definition: stasis.c:2221
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
static int subscription_statistics_hash(const void *obj, const int flags)
Definition: stasis.c:2924
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static int topic_statistics_hash(const void *obj, const int flags)
Definition: stasis.c:2982
static struct console_pvt globals
Their was an error and no changes were applied.
#define SUBSCRIPTION_STATISTICS_BUCKETS
Definition: stasis.c:328
static struct ast_cli_entry cli_stasis[]
Definition: stasis.c:2476
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
static void * stasis_config_alloc(void)
Definition: stasis.c:2253
static int subscription_statistics_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:2945
struct ao2_container * topic_all
Definition: stasis.c:396
#define LOG_NOTICE
Definition: logger.h:263
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2213
int stasis_cache_init(void)
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define TOPIC_STATISTICS_BUCKETS
Definition: stasis.c:325
#define ao2_global_obj_replace_unref(holder, obj)
Definition: astobj2.h:908
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2197
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
Generic container type.
static struct aco_type * threadpool_options[]
Definition: stasis.c:2210
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
Definition: stasis.c:308
static int topic_statistics_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:3003
static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
Definition: stasis.c:2304
#define TOPIC_ALL_BUCKETS
Definition: stasis.c:318

◆ stasis_log_bad_type_access()

void stasis_log_bad_type_access ( const char *  name)

Definition at line 1940 of file stasis.c.

References ast_log, LOG_ERROR, and stasis_message_type_declined().

1941 {
1942 #ifdef AST_DEVMODE
1944  ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1945  }
1946 #endif
1947 }
#define ast_log
Definition: astobj2.c:42
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2283
#define LOG_ERROR
Definition: logger.h:285
static const char name[]
Definition: cdr_mysql.c:74

◆ stasis_message_type_declined()

int stasis_message_type_declined ( const char *  name)

Check whether a message type is declined.

Parameters
nameThe name of the message type to check
Return values
zeroThe message type is not declined
non-zeroThe message type is declined

Definition at line 2283 of file stasis.c.

References ao2_cleanup, ao2_find, ao2_global_obj_ref, ao2_ref, ast_log, stasis_declined_config::declined, stasis_config::declined_message_types, globals, LOG_NOTICE, and OBJ_SEARCH_KEY.

Referenced by stasis_log_bad_type_access(), and stasis_message_type_create().

2284 {
2285  struct stasis_config *cfg = ao2_global_obj_ref(globals);
2286  char *name_in_declined;
2287  int res;
2288 
2289  if (!cfg || !cfg->declined_message_types) {
2290  ao2_cleanup(cfg);
2291  return 0;
2292  }
2293 
2294  name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2295  res = name_in_declined ? 1 : 0;
2296  ao2_cleanup(name_in_declined);
2297  ao2_ref(cfg, -1);
2298  if (res) {
2299  ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2300  }
2301  return res;
2302 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
struct ao2_container * declined
Definition: stasis.c:2182
#define ast_log
Definition: astobj2.c:42
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static struct console_pvt globals
#define LOG_NOTICE
Definition: logger.h:263
static const char name[]
Definition: cdr_mysql.c:74
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ STASIS_MESSAGE_TYPE_DEFN() [1/2]

STASIS_MESSAGE_TYPE_DEFN ( stasis_subscription_change_type  )

Referenced by declined_handler().

◆ STASIS_MESSAGE_TYPE_DEFN() [2/2]

STASIS_MESSAGE_TYPE_DEFN ( ast_multi_user_event_type  ,
to_json = multi_user_event_to_json,
to_ami = multi_user_event_to_ami 
)

Define multi user event message type(s).

◆ stasis_publish()

void stasis_publish ( struct stasis_topic topic,
struct stasis_message message 
)

Publish a message to a topic's subscribers.

Parameters
topicTopic.
messageMessage to publish.

This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.

Since
12

Definition at line 1511 of file stasis.c.

References NULL, and publish_msg().

Referenced by __ast_test_suite_event_notify(), aoc_publish_blob(), app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_monitor_start(), ast_monitor_stop(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_handler(), bridge_publish_state_from_blob(), bridge_topics_destroy(), cache_test_aggregate_publish_fn(), caching_topic_exec(), cc_publish(), clear_node_cache(), device_state_aggregate_publish(), endpoint_publish_snapshot(), endpoint_state_cb(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), meetme_stasis_generate_msg(), mixmonitor_exec(), moh_post_start(), moh_post_stop(), notify_new_message(), phase_e_handler(), presence_state_event(), publish_acl_change(), publish_chanspy_message(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_local_bridge_message(), publish_message_for_channel_topics(), publish_parked_call(), publish_parked_call_failure(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), remove_device_states_cb(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), send_call_pickup_stasis_message(), send_conf_stasis(), send_conf_stasis_snapshots(), send_msg(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), stop_mixmonitor_full(), stun_monitor_request(), and talk_detect_audiohook_cb().

1512 {
1513  publish_msg(topic, message, NULL);
1514 }
#define NULL
Definition: resample.c:96
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1432

◆ stasis_publish_sync()

void stasis_publish_sync ( struct stasis_subscription sub,
struct stasis_message message 
)

Publish a message to a topic's subscribers, synchronizing on the specified subscriber.

Parameters
subSubscription to synchronize on.
messageMessage to publish.

The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.

All other subscribers to the topic the stasis_subpscription is subscribed to are also delivered the message; this delivery however happens asynchronously.

Since
12.1.0

Definition at line 1516 of file stasis.c.

References ast_assert, NULL, publish_msg(), and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), and stasis_message_router_publish_sync().

1517 {
1518  ast_assert(sub != NULL);
1519 
1520  publish_msg(sub->topic, message, sub);
1521 }
struct stasis_topic * topic
Definition: stasis.c:685
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
Definition: stasis.c:1432

◆ stasis_show_topic()

static char* stasis_show_topic ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2417 of file stasis.c.

References ao2_lock, ao2_ref, ao2_unlock, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), ast_format_duration_hh_mm_ss(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, NULL, ast_cli_args::pos, stasis_topic_get(), stasis_subscription::topic, topic_complete_name(), stasis_subscription::uniqueid, ast_cli_entry::usage, and ast_cli_args::word.

2418 {
2419  struct stasis_topic *topic;
2420  char print_time[32];
2421  int i;
2422 
2423  switch (cmd) {
2424  case CLI_INIT:
2425  e->command = "stasis show topic";
2426  e->usage =
2427  "Usage: stasis show topic <name>\n"
2428  " Show stasis topic detail info.\n";
2429  return NULL;
2430  case CLI_GENERATE:
2431  if (a->pos == 3) {
2432  return topic_complete_name(a->word);
2433  } else {
2434  return NULL;
2435  }
2436  }
2437 
2438  if (a->argc != 4) {
2439  return CLI_SHOWUSAGE;
2440  }
2441 
2442  topic = stasis_topic_get(a->argv[3]);
2443  if (!topic) {
2444  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2445  return CLI_FAILURE;
2446  }
2447 
2448  ast_cli(a->fd, "Name: %s\n", topic->name);
2449  ast_cli(a->fd, "Detail: %s\n", topic->detail);
2450  ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2451  ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2452  ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2453  ast_cli(a->fd, "Duration time: %s\n", print_time);
2454 
2455  ao2_lock(topic);
2456  ast_cli(a->fd, "\nSubscribers:\n");
2457  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2458  struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2459  ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2460  subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2461  }
2462 
2463  ast_cli(a->fd, "\nForwarded topics:\n");
2464  for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2465  struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2466  ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2467  }
2468  ao2_unlock(topic);
2469 
2470  ao2_ref(topic, -1);
2471 
2472  return CLI_SUCCESS;
2473 }
struct stasis_topic * topic
Definition: stasis.c:685
const int argc
Definition: cli.h:160
Definition: cli.h:152
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
static char * topic_complete_name(const char *word)
Definition: stasis.c:2391
const int fd
Definition: cli.h:159
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
const char *const * argv
Definition: cli.h:161
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define CLI_FAILURE
Definition: cli.h:46
char * command
Definition: cli.h:186
const char * word
Definition: cli.h:163
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
const int pos
Definition: cli.h:164
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:623
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: main/utils.c:2049
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_show_topics()

static char* stasis_show_topics ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2334 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_list, ao2_container_dup(), ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, AO2_ITERATOR_UNLINK, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, topic_proxy::detail, ast_cli_args::fd, FMT_FIELDS, FMT_HEADERS, topic_proxy::name, NULL, OBJ_SEARCH_OBJECT, and ast_cli_entry::usage.

2335 {
2336  struct ao2_iterator iter;
2337  struct topic_proxy *topic;
2338  struct ao2_container *tmp_container;
2339  int count = 0;
2340 #define FMT_HEADERS "%-64s %-64s\n"
2341 #define FMT_FIELDS "%-64s %-64s\n"
2342 
2343  switch (cmd) {
2344  case CLI_INIT:
2345  e->command = "stasis show topics";
2346  e->usage =
2347  "Usage: stasis show topics\n"
2348  " Shows a list of topics\n";
2349  return NULL;
2350  case CLI_GENERATE:
2351  return NULL;
2352  }
2353 
2354  if (a->argc != e->args) {
2355  return CLI_SHOWUSAGE;
2356  }
2357 
2358  ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2359 
2361  topic_proxy_sort_fn, NULL);
2362 
2363  if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2364  ao2_cleanup(tmp_container);
2365 
2366  return NULL;
2367  }
2368 
2369  /* getting all topic in order */
2370  iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2371  while ((topic = ao2_iterator_next(&iter))) {
2372  ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2373  ao2_ref(topic, -1);
2374  ++count;
2375  }
2376  ao2_iterator_destroy(&iter);
2377  ao2_cleanup(tmp_container);
2378 
2379  ast_cli(a->fd, "\n%d Total topics\n\n", count);
2380 
2381 #undef FMT_HEADERS
2382 #undef FMT_FIELDS
2383 
2384  return CLI_SUCCESS;
2385 }
#define FMT_FIELDS
const int argc
Definition: cli.h:160
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
Definition: cli.h:152
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
char * detail
Definition: stasis.c:402
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
int args
This gets set in ast_cli_register()
Definition: cli.h:185
char * name
Definition: stasis.c:401
const int fd
Definition: cli.h:159
#define ao2_ref(o, delta)
Definition: astobj2.h:464
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define CLI_SHOWUSAGE
Definition: cli.h:45
struct ao2_container * topic_all
Definition: stasis.c:396
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
#define FMT_HEADERS
Generic container type.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.

◆ stasis_subscription_accept_formatters()

void stasis_subscription_accept_formatters ( struct stasis_subscription subscription,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a subscription that we are interested in messages with one or more formatters.

Parameters
subscriptionSubscription to alter.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 1095 of file stasis.c.

References ao2_lock, ao2_unlock, ast_assert, NULL, and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().

1097 {
1098  ast_assert(subscription != NULL);
1099 
1100  ao2_lock(subscription->topic);
1101  subscription->accepted_formatters = formatters;
1102  ao2_unlock(subscription->topic);
1103 
1104  return;
1105 }
struct stasis_topic * topic
Definition: stasis.c:685
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_subscription_accept_message_type()

int stasis_subscription_accept_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are interested in a message type.

This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
subscriptionSubscription to add message type to.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0
Note
If you are wanting to use stasis_final_message you will need to accept stasis_subscription_change_type as a message type.
Until the subscription is set to selective filtering it is possible for it to receive messages of message types that would not normally be accepted.

Definition at line 1025 of file stasis.c.

References ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, NULL, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by acl_change_stasis_subscribe(), add_peer_mwi_subs(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), mwi_stasis_subscription_alloc(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_create_internal(), subscribe_device_state(), and xmpp_init_event_distribution().

1027 {
1028  if (!subscription) {
1029  return -1;
1030  }
1031 
1032  ast_assert(type != NULL);
1034 
1035  if (!type || !stasis_message_type_name(type)) {
1036  /* Filtering is unreliable as this message type is not yet initialized
1037  * so force all messages through.
1038  */
1039  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1040  return 0;
1041  }
1042 
1043  ao2_lock(subscription->topic);
1044  if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
1045  /* We do this for the same reason as above. The subscription can still operate, so allow
1046  * it to do so by forcing all messages through.
1047  */
1048  subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
1049  }
1050  ao2_unlock(subscription->topic);
1051 
1052  return 0;
1053 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_topic * topic
Definition: stasis.c:685
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284

◆ stasis_subscription_cb_noop()

void stasis_subscription_cb_noop ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)

Stasis subscription callback function that does nothing.

Note
This callback should be used for events are not directly processed, but need to be generated so data can be retrieved from cache later. Subscriptions with this callback can be released with stasis_unsubscribe, even during module unload.
Since
13.5

Definition at line 811 of file stasis.c.

Referenced by build_gateway(), build_peer(), and mkintf().

812 {
813 }

◆ stasis_subscription_decline_message_type()

int stasis_subscription_decline_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are not interested in a message type.

Parameters
subscriptionSubscription to remove message type from.
typeThe message type we don't wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1055 of file stasis.c.

References ao2_lock, ao2_unlock, ast_assert, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, NULL, stasis_message_type_id(), stasis_message_type_name(), and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE().

1057 {
1058  if (!subscription) {
1059  return -1;
1060  }
1061 
1062  ast_assert(type != NULL);
1064 
1065  if (!type || !stasis_message_type_name(type)) {
1066  return 0;
1067  }
1068 
1069  ao2_lock(subscription->topic);
1070  if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
1071  /* The memory is already allocated so this can't fail */
1072  AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
1073  }
1074  ao2_unlock(subscription->topic);
1075 
1076  return 0;
1077 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_topic * topic
Definition: stasis.c:685
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_subscription_final_message()

int stasis_subscription_final_message ( struct stasis_subscription sub,
struct stasis_message msg 
)

Determine whether a message is the final message to be received on a subscription.

Parameters
subSubscription on which the message was received.
msgMessage to check.
Returns
zero if the provided message is not the final message.
non-zero if the provided message is the final message.
Since
12

Definition at line 1176 of file stasis.c.

References stasis_subscription_change::description, stasis_message_data(), stasis_message_type(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), and stasis_subscription_change::uniqueid.

Referenced by bridge_subscription_change_handler(), caching_topic_exec(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), default_route(), device_state_cb(), dispatch_message(), endpoint_subscription_change(), generic_agent_devstate_cb(), message_sink_cb(), mwi_event_cb(), mwi_stasis_cb(), park_announce_update_cb(), parker_update_cb(), queue_bridge_cb(), queue_channel_cb(), refer_progress_bridge(), router_dispatch(), statsmaker(), sub_subscription_change_handler(), and subscription_invoke().

1177 {
1178  struct stasis_subscription_change *change;
1179 
1181  return 0;
1182  }
1183 
1184  change = stasis_message_data(msg);
1185  if (strcmp("Unsubscribe", change->description)) {
1186  return 0;
1187  }
1188 
1189  if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1190  return 0;
1191  }
1192 
1193  return 1;
1194 }
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1171
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.

◆ stasis_subscription_is_done()

int stasis_subscription_is_done ( struct stasis_subscription subscription)

Returns whether subscription has received its final message.

Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.

Parameters
subscriptionSubscription.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 1120 of file stasis.c.

References ao2_lock, ao2_unlock, and stasis_subscription::final_message_rxed.

Referenced by router_dtor(), stasis_caching_topic_dtor(), stasis_message_router_is_done(), and subscription_dtor().

1121 {
1122  if (subscription) {
1123  int ret;
1124 
1125  ao2_lock(subscription);
1126  ret = subscription->final_message_rxed;
1127  ao2_unlock(subscription);
1128 
1129  return ret;
1130  }
1131 
1132  /* Null subscription is about as done as you can get */
1133  return 1;
1134 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
int final_message_rxed
Definition: stasis.c:697

◆ stasis_subscription_is_subscribed()

int stasis_subscription_is_subscribed ( const struct stasis_subscription sub)

Returns whether a subscription is currently subscribed.

Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.

Parameters
subSubscription to check
Returns
False (zero) if subscription is not subscribed.
True (non-zero) if still subscribed.

Definition at line 1152 of file stasis.c.

References ao2_lock, ao2_unlock, AST_VECTOR_GET, AST_VECTOR_SIZE, sub, and stasis_subscription::topic.

Referenced by asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), router_dtor(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_caching_topic_dtor(), stasis_caching_unsubscribe(), subscription_dtor(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

1153 {
1154  if (sub) {
1155  size_t i;
1156  struct stasis_topic *topic = sub->topic;
1157 
1158  ao2_lock(topic);
1159  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1160  if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1161  ao2_unlock(topic);
1162  return 1;
1163  }
1164  }
1165  ao2_unlock(topic);
1166  }
1167 
1168  return 0;
1169 }
struct stasis_topic * topic
Definition: stasis.c:685
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
struct stasis_forward * sub
Definition: res_corosync.c:240
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_subscription_join()

void stasis_subscription_join ( struct stasis_subscription subscription)

Block until the last message is processed on a subscription.

This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.

Parameters
subscriptionSubscription to block on.
Since
12

Definition at line 1107 of file stasis.c.

References ao2_lock, ao2_object_get_lockaddr(), ao2_unlock, ast_cond_wait, stasis_subscription::final_message_processed, and stasis_subscription::join_cond.

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().

1108 {
1109  if (subscription) {
1110  ao2_lock(subscription);
1111  /* Wait until the processed flag has been set */
1112  while (!subscription->final_message_processed) {
1113  ast_cond_wait(&subscription->join_cond,
1114  ao2_object_get_lockaddr(subscription));
1115  }
1116  ao2_unlock(subscription);
1117  }
1118 }
ast_cond_t join_cond
Definition: stasis.c:694
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ao2_unlock(a)
Definition: astobj2.h:730
int final_message_processed
Definition: stasis.c:700
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_subscription_set_congestion_limits()

int stasis_subscription_set_congestion_limits ( struct stasis_subscription subscription,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis subscription.

Since
13.10.0
Parameters
subscriptionPointer to a stasis subscription
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1013 of file stasis.c.

References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.

Referenced by stasis_message_router_set_congestion_limits().

1015 {
1016  int res = -1;
1017 
1018  if (subscription) {
1019  res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1020  low_water, high_water);
1021  }
1022  return res;
1023 }
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
struct ast_taskprocessor * mailbox
Definition: stasis.c:687

◆ stasis_subscription_set_filter()

int stasis_subscription_set_filter ( struct stasis_subscription subscription,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a subscription.

This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
subscriptionSubscription that should receive all messages.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1079 of file stasis.c.

References ao2_lock, ao2_unlock, filter(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by acl_change_stasis_subscribe(), ast_mwi_subscribe_pool(), ast_res_pjsip_initialize_configuration(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), create_parked_subscription_full(), devstate_init(), load_module(), load_pbx(), network_change_stasis_subscribe(), park_and_announce_app_exec(), parking_manager_enable_stasis(), refer_blind_callback(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), subscribe_device_state(), and xmpp_init_event_distribution().

1081 {
1082  if (!subscription) {
1083  return -1;
1084  }
1085 
1086  ao2_lock(subscription->topic);
1087  if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1088  subscription->filter = filter;
1089  }
1090  ao2_unlock(subscription->topic);
1091 
1092  return 0;
1093 }
struct stasis_topic * topic
Definition: stasis.c:685
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709

◆ stasis_subscription_statistics_create()

static struct stasis_subscription_statistics* stasis_subscription_statistics_create ( struct stasis_subscription sub,
int  needs_mailbox,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)
static

Definition at line 823 of file stasis.c.

References ao2_alloc, ao2_cleanup, ao2_global_obj_ref, ao2_link, ao2_ref, ast_str_container_alloc, make_ari_stubs::file, stasis_subscription_statistics::file, stasis_subscription_statistics::func, stasis_subscription_statistics::lineno, NULL, RAII_VAR, statistics(), sub, stasis_subscription_statistics::sub, subscription_statistics_destroy(), stasis_subscription_statistics::topics, stasis_subscription_statistics::uniqueid, stasis_subscription::uniqueid, stasis_subscription_statistics::uses_mailbox, and stasis_subscription_statistics::uses_threadpool.

Referenced by internal_stasis_subscribe().

826 {
828  RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
829 
830  if (!subscription_stats) {
831  return NULL;
832  }
833 
834  statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
835  if (!statistics) {
836  return NULL;
837  }
838 
839  statistics->topics = ast_str_container_alloc(1);
840  if (!statistics->topics) {
841  ao2_ref(statistics, -1);
842  return NULL;
843  }
844 
845  statistics->file = file;
846  statistics->lineno = lineno;
847  statistics->func = func;
848  statistics->uses_mailbox = needs_mailbox;
849  statistics->uses_threadpool = use_thread_pool;
850  strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
851  statistics->sub = sub;
852  ao2_link(subscription_stats, statistics);
853 
854  return statistics;
855 }
static void statistics(void)
Definition: utils/frame.c:287
const char * func
The function where the subscription originates.
Definition: stasis.c:654
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1312
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
#define NULL
Definition: resample.c:96
int lineno
The line number where the subscription originates.
Definition: stasis.c:672
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
static void subscription_statistics_destroy(void *obj)
Definition: stasis.c:816
const char * file
The filename where the subscription originates.
Definition: stasis.c:652
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct ao2_container * topics
Names of the topics we are subscribed to.
Definition: stasis.c:656
int uses_mailbox
Using a mailbox to queue messages.
Definition: stasis.c:668
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
int uses_threadpool
Using stasis threadpool for handling messages.
Definition: stasis.c:670
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
char uniqueid[0]
Unique ID of the subscription.
Definition: stasis.c:676
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_subscription * sub
Pointer to the subscription (NOT refcounted, and must NOT be accessed)
Definition: stasis.c:674
Generic container type.
#define ao2_link(container, obj)
Definition: astobj2.h:1549

◆ stasis_subscription_uniqueid()

const char* stasis_subscription_uniqueid ( const struct stasis_subscription sub)

Get the unique ID for the subscription.

Parameters
subSubscription for which to get the unique ID.
Returns
Unique ID for the subscription.
Since
12

Definition at line 1171 of file stasis.c.

References stasis_subscription::uniqueid.

Referenced by AST_TEST_DEFINE(), stasis_subscription_final_message(), topic_add_subscription(), and topic_remove_subscription().

1172 {
1173  return sub->uniqueid;
1174 }

◆ stasis_topic_create()

struct stasis_topic* stasis_topic_create ( const char *  name)

Create a new topic.

Parameters
nameName of the new topic.
Returns
New topic instance.
NULL on error.
Since
12
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.
Topic names should be in the form of <subsystem>:<functionality>[/<object>]

Definition at line 618 of file stasis.c.

References stasis_topic_create_with_detail().

Referenced by __init_manager(), app_create(), app_init(), ast_channel_internal_setup_topics(), ast_parking_stasis_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_bridging_init(), ast_stasis_channels_init(), ast_stasis_system_init(), AST_TEST_DEFINE(), ast_test_init(), create_cts(), create_subscriptions(), devstate_init(), load_module(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), stasis_topic_pool_get_topic(), and state_alloc().

619 {
621 }
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:569
static const char name[]
Definition: cdr_mysql.c:74

◆ stasis_topic_create_with_detail()

struct stasis_topic* stasis_topic_create_with_detail ( const char *  name,
const char *  detail 
)

Create a new topic with given detail.

Parameters
nameName of the new topic.
detailDetail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
Returns
New topic instance.
NULL on error.
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.

Definition at line 569 of file stasis.c.

References ao2_ref, ao2_t_alloc, ast_debug, AST_VECTOR_INIT, INITIAL_SUBSCRIBERS_MAX, link_topic_proxy(), NULL, stasis_topic_get(), stasis_topic_statistics_create(), and topic_dtor().

Referenced by stasis_topic_create().

572 {
573  struct stasis_topic *topic;
574  int res = 0;
575 
576  if (!name|| !strlen(name) || !detail) {
577  return NULL;
578  }
579  ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
580 
581  topic = stasis_topic_get(name);
582  if (topic) {
583  ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
584  name, detail);
585  return topic;
586  }
587 
588  topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
589  if (!topic) {
590  return NULL;
591  }
592 
593  res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
594  res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
595  if (res) {
596  ao2_ref(topic, -1);
597  return NULL;
598  }
599 
600  /* link to the proxy */
601  if (link_topic_proxy(topic, name, detail)) {
602  ao2_ref(topic, -1);
603  return NULL;
604  }
605 
606 #ifdef AST_DEVMODE
607  topic->statistics = stasis_topic_statistics_create(topic);
608  if (!topic->statistics) {
609  ao2_ref(topic, -1);
610  return NULL;
611  }
612 #endif
613  ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
614 
615  return topic;
616 }
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
static void topic_dtor(void *obj)
Definition: stasis.c:434
#define NULL
Definition: resample.c:96
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
Definition: stasis.c:501
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static const char name[]
Definition: cdr_mysql.c:74
static struct stasis_topic_statistics * stasis_topic_statistics_create(struct stasis_topic *topic)
Definition: stasis.c:472
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:302
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:623

◆ stasis_topic_detail()

const char* stasis_topic_detail ( const struct stasis_topic topic)

Return the detail of a topic.

Parameters
topicTopic.
Returns
Detail of the topic.
NULL if topic is NULL.
Since
12

Definition at line 636 of file stasis.c.

References NULL.

637 {
638  if (!topic) {
639  return NULL;
640  }
641  return topic->detail;
642 }
#define NULL
Definition: resample.c:96

◆ stasis_topic_get()

struct stasis_topic* stasis_topic_get ( const char *  name)

Get a topic of the given name.

Parameters
nameTopic's name.
Returns
Name of the topic.
NULL on error or not exist.
Note
This SHOULD NOT be used in normal operation for publishing messages.

Definition at line 623 of file stasis.c.

References ao2_weakproxy_find, and OBJ_SEARCH_KEY.

Referenced by link_topic_proxy(), stasis_show_topic(), and stasis_topic_create_with_detail().

624 {
626 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
struct ao2_container * topic_all
Definition: stasis.c:396
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
Definition: astobj2.h:1768
static const char name[]
Definition: cdr_mysql.c:74

◆ stasis_topic_name()

const char* stasis_topic_name ( const struct stasis_topic topic)

◆ stasis_topic_pool_create()

struct stasis_topic_pool* stasis_topic_pool_create ( struct stasis_topic pooled_topic)

Create a topic pool that routes messages from dynamically generated topics to the given topic.

Parameters
pooled_topicTopic to which messages will be routed
Returns
the new stasis_topic_pool
NULL on failure

Definition at line 1833 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, NULL, pool, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, stasis_topic_name(), TOPIC_POOL_BUCKETS, topic_pool_dtor(), topic_pool_entry_cmp(), and topic_pool_entry_hash().

Referenced by app_init(), ast_stasis_bridging_init(), and devstate_init().

1834 {
1835  struct stasis_topic_pool *pool;
1836 
1838  if (!pool) {
1839  return NULL;
1840  }
1841 
1844  if (!pool->pool_container) {
1845  ao2_cleanup(pool);
1846  return NULL;
1847  }
1848 
1849 #ifdef AO2_DEBUG
1850  {
1851  char *container_name =
1852  ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1853  sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1854  ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1855  }
1856 #endif
1857 
1858  ao2_ref(pooled_topic, +1);
1859  pool->pool_topic = pooled_topic;
1860 
1861  return pool;
1862 }
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:305
static void topic_pool_dtor(void *obj)
Definition: stasis.c:1744
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis.c:1784
static pj_pool_t * pool
Global memory pool for configuration and timers.
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
struct ao2_container * pool_container
Definition: stasis.c:1740
static int topic_pool_entry_hash(const void *obj, const int flags)
Definition: stasis.c:1763
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
struct stasis_topic * pool_topic
Definition: stasis.c:1741
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_topic_pool_delete_topic()

void stasis_topic_pool_delete_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Delete a topic from the topic pool.

Parameters
poolPool from which to delete the topic
topic_nameName of the topic to delete in the form of <pool_topic_name>/<topic_name> or just <topic_name>
Since
13.24
15.6
16.1

Definition at line 1864 of file stasis.c.

References ao2_find, OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, and stasis_topic_name().

Referenced by bridge_topics_destroy().

1865 {
1866  /*
1867  * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1868  * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1869  * name and search only on <topic_name>.
1870  */
1871  const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1872  int pool_topic_name_len = strlen(pool_topic_name);
1873  const char *search_topic_name;
1874 
1875  if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1876  search_topic_name = topic_name + pool_topic_name_len + 1;
1877  } else {
1878  search_topic_name = topic_name;
1879  }
1880 
1881  ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1882 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
struct ao2_container * pool_container
Definition: stasis.c:1740
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
struct stasis_topic * pool_topic
Definition: stasis.c:1741

◆ stasis_topic_pool_get_topic()

struct stasis_topic* stasis_topic_pool_get_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Find or create a topic in the pool.

Parameters
poolPool for which to get the topic
topic_nameName of the topic to get
Returns
The already stored or newly allocated topic
NULL if the topic was not found and could not be allocated

Definition at line 1884 of file stasis.c.

References ao2_cleanup, ao2_find, ao2_link_flags, ast_asprintf, ast_free, topic_pool_entry::forward, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_topic_pool::pool_container, stasis_topic_pool::pool_topic, RAII_VAR, SCOPED_AO2LOCK, stasis_forward_all(), stasis_topic_create(), stasis_topic_name(), topic_pool_entry::topic, and topic_pool_entry_alloc().

Referenced by ast_device_state_topic(), ast_queue_topic(), and bridge_topics_init().

1885 {
1887  SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1888  char *new_topic_name;
1889  int ret;
1890 
1892  if (topic_pool_entry) {
1893  return topic_pool_entry->topic;
1894  }
1895 
1897  if (!topic_pool_entry) {
1898  return NULL;
1899  }
1900 
1901  /* To provide further detail and to ensure that the topic is unique within the scope of the
1902  * system we prefix it with the pooling topic name, which should itself already be unique.
1903  */
1904  ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1905  if (ret < 0) {
1906  return NULL;
1907  }
1908 
1909  topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1910  ast_free(new_topic_name);
1911  if (!topic_pool_entry->topic) {
1912  return NULL;
1913  }
1914 
1916  if (!topic_pool_entry->forward) {
1917  return NULL;
1918  }
1919 
1921  return NULL;
1922  }
1923 
1924  return topic_pool_entry->topic;
1925 }
Definition: stasis.c:1709
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
static struct topic_pool_entry * topic_pool_entry_alloc(const char *topic_name)
Definition: stasis.c:1724
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define NULL
Definition: resample.c:96
struct ao2_container * pool_container
Definition: stasis.c:1740
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
struct stasis_topic * topic
Definition: stasis.c:1711
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define ast_free(a)
Definition: astmm.h:182
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
struct stasis_topic * pool_topic
Definition: stasis.c:1741
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * forward
Definition: stasis.c:1710
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578

◆ stasis_topic_pool_topic_exists()

int stasis_topic_pool_topic_exists ( const struct stasis_topic_pool pool,
const char *  topic_name 
)

Check if a topic exists in a pool.

Parameters
poolPool to check
topic_nameName of the topic to check
Return values
1exists
0does not exist
Since
13.23.0

Definition at line 1927 of file stasis.c.

References ao2_find, ao2_ref, OBJ_SEARCH_KEY, and stasis_topic_pool::pool_container.

Referenced by ast_publish_device_state_full().

1928 {
1930 
1931  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1932  if (!topic_pool_entry) {
1933  return 0;
1934  }
1935 
1936  ao2_ref(topic_pool_entry, -1);
1937  return 1;
1938 }
Definition: stasis.c:1709
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
struct ao2_container * pool_container
Definition: stasis.c:1740
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756

◆ stasis_topic_statistics_create()

static struct stasis_topic_statistics* stasis_topic_statistics_create ( struct stasis_topic topic)
static

Definition at line 472 of file stasis.c.

References ao2_alloc, ao2_cleanup, ao2_global_obj_ref, ao2_link, ao2_ref, ast_str_container_alloc, NULL, RAII_VAR, statistics(), and topic_statistics_destroy().

Referenced by stasis_topic_create_with_detail().

473 {
474  struct stasis_topic_statistics *statistics;
475  RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
476 
477  if (!topic_stats) {
478  return NULL;
479  }
480 
481  statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
482  if (!statistics) {
483  return NULL;
484  }
485 
486  statistics->subscribers = ast_str_container_alloc(1);
487  if (!statistics->subscribers) {
488  ao2_ref(statistics, -1);
489  return NULL;
490  }
491 
492  /* This is strictly used for the pointer address when showing the topic */
493  statistics->topic = topic;
494  strcpy(statistics->name, topic->name); /* SAFE */
495  ao2_link(topic_stats, statistics);
496 
497  return statistics;
498 }
static void statistics(void)
Definition: utils/frame.c:287
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1312
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
#define NULL
Definition: resample.c:96
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static void topic_statistics_destroy(void *obj)
Definition: stasis.c:465
Generic container type.
#define ao2_link(container, obj)
Definition: astobj2.h:1549

◆ stasis_topic_subscribers()

size_t stasis_topic_subscribers ( const struct stasis_topic topic)

Return the number of subscribers of a topic.

Parameters
topicTopic.
Returns
Number of subscribers of the topic.
Since
17.0.0

Definition at line 644 of file stasis.c.

References AST_VECTOR_SIZE.

Referenced by caching_topic_exec(), and publish_msg().

645 {
646  return AST_VECTOR_SIZE(&topic->subscribers);
647 }
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_unsubscribe()

struct stasis_subscription* stasis_unsubscribe ( struct stasis_subscription subscription)

Cancel a subscription.

Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.

Parameters
subscriptionSubscription to cancel.
Returns
NULL for convenience
Since
12

Definition at line 973 of file stasis.c.

References ao2_bump, ao2_cleanup, ast_log, ast_taskprocessor_push(), LOG_ERROR, stasis_subscription::mailbox, NULL, send_subscription_unsubscribe(), sub_cleanup(), stasis_subscription::topic, and topic_remove_subscription().

Referenced by AST_TEST_DEFINE(), cc_generic_agent_destructor(), destroy_cts(), generic_agent_devstate_cb(), generic_monitor_instance_list_destructor(), mwi_startup_event_cb(), park_and_announce_app_exec(), parked_subscription_datastore_destroy(), refer_progress_bridge(), refer_progress_destroy(), refer_progress_framehook_destroy(), startup_event_cb(), stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), subscription_persistence_event_cb(), unload_module(), and xmpp_init_event_distribution().

974 {
975  /* The subscription may be the last ref to this topic. Hold
976  * the topic ref open until after the unlock. */
977  struct stasis_topic *topic;
978 
979  if (!sub) {
980  return NULL;
981  }
982 
983  topic = ao2_bump(sub->topic);
984 
985  /* We have to remove the subscription first, to ensure the unsubscribe
986  * is the final message */
987  if (topic_remove_subscription(sub->topic, sub) != 0) {
989  "Internal error: subscription has invalid topic\n");
990  ao2_cleanup(topic);
991 
992  return NULL;
993  }
994 
995  /* Now let everyone know about the unsubscribe */
996  send_subscription_unsubscribe(topic, sub);
997 
998  /* When all that's done, remove the ref the mailbox has on the sub */
999  if (sub->mailbox) {
1000  if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
1001  /* Nothing we can do here, the conditional is just to keep
1002  * the compiler happy that we're not ignoring the result. */
1003  }
1004  }
1005 
1006  /* Unsubscribing unrefs the subscription */
1007  ao2_cleanup(sub);
1008  ao2_cleanup(topic);
1009 
1010  return NULL;
1011 }
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1231
struct stasis_topic * topic
Definition: stasis.c:685
static int sub_cleanup(void *data)
Definition: stasis.c:966
#define NULL
Definition: resample.c:96
#define ao2_bump(obj)
Definition: astobj2.h:491
#define ast_log
Definition: astobj2.c:42
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1676
#define LOG_ERROR
Definition: logger.h:285
struct ast_taskprocessor * mailbox
Definition: stasis.c:687
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_unsubscribe_and_join()

struct stasis_subscription* stasis_unsubscribe_and_join ( struct stasis_subscription subscription)

Cancel a subscription, blocking until the last message is processed.

While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).

Parameters
subscriptionSubscription to cancel.
Returns
NULL for convenience
Since
12

Definition at line 1136 of file stasis.c.

References ao2_cleanup, ao2_ref, NULL, stasis_subscription_join(), and stasis_unsubscribe().

Referenced by acl_change_event_stasis_unsubscribe(), acl_change_stasis_unsubscribe(), ast_res_pjsip_destroy_configuration(), AST_TEST_DEFINE(), ast_xmpp_client_disconnect(), asterisk_stop_devicestate_publishing(), asterisk_stop_mwi_publishing(), devstate_cleanup(), network_change_stasis_unsubscribe(), parking_manager_disable_stasis(), presence_change_common(), remove_device_state_subscription(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), unload_module(), and unload_pbx().

1138 {
1139  if (!subscription) {
1140  return NULL;
1141  }
1142 
1143  /* Bump refcount to hold it past the unsubscribe */
1144  ao2_ref(subscription, +1);
1145  stasis_unsubscribe(subscription);
1146  stasis_subscription_join(subscription);
1147  /* Now decrement the refcount back */
1148  ao2_cleanup(subscription);
1149  return NULL;
1150 }
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:973
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1107

◆ statistics_show_messages()

static char* statistics_show_messages ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2865 of file stasis.c.

References ast_cli_args::argc, ast_cli_entry::args, ast_cli(), ast_mutex_lock, ast_mutex_unlock, AST_VECTOR_GET_ADDR, AST_VECTOR_SIZE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_FIELDS, FMT_HEADERS, stasis_message_type_statistics::message_type, message_type_statistics_lock, NULL, stasis_message_type_statistics::published, stasis_message_type_name(), statistics(), stasis_message_type_statistics::unused, and ast_cli_entry::usage.

2866 {
2867  int i;
2868  int count = 0;
2869  int published = 0;
2870  int unused = 0;
2871 #define FMT_HEADERS "%-64s %10s %10s\n"
2872 #define FMT_FIELDS "%-64s %10d %10d\n"
2873 
2874  switch (cmd) {
2875  case CLI_INIT:
2876  e->command = "stasis statistics show messages";
2877  e->usage =
2878  "Usage: stasis statistics show messages\n"
2879  " Shows a list of message types and their general statistics\n";
2880  return NULL;
2881  case CLI_GENERATE:
2882  return NULL;
2883  }
2884 
2885  if (a->argc != e->args) {
2886  return CLI_SHOWUSAGE;
2887  }
2888 
2889  ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2890 
2892  for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2893  struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2894 
2895  if (!statistics->message_type) {
2896  continue;
2897  }
2898 
2899  ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2900  statistics->unused);
2901  published += statistics->published;
2902  unused += statistics->unused;
2903  ++count;
2904  }
2906 
2907  ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2908  ast_cli(a->fd, "\n%d seen message types\n\n", count);
2909 
2910 #undef FMT_HEADERS
2911 #undef FMT_FIELDS
2912 
2913  return CLI_SUCCESS;
2914 }
static void statistics(void)
Definition: utils/frame.c:287
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
#define FMT_FIELDS
const int argc
Definition: cli.h:160
Definition: cli.h:152
#define ast_mutex_lock(a)
Definition: lock.h:187
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
int unused
The number of messages of this that did not reach a subscriber.
Definition: stasis.c:341
int args
This gets set in ast_cli_register()
Definition: cli.h:185
static ast_mutex_t message_type_statistics_lock
Definition: stasis.c:347
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
const int fd
Definition: cli.h:159
#define CLI_SHOWUSAGE
Definition: cli.h:45
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
int published
The number of messages of this published.
Definition: stasis.c:339
#define FMT_HEADERS
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define ast_mutex_unlock(a)
Definition: lock.h:188
struct stasis_message_type * message_type
The stasis message type.
Definition: stasis.c:343

◆ statistics_show_subscription()

static char* statistics_show_subscription ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2604 of file stasis.c.

References ao2_container_count(), ao2_find, ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_lock, ao2_ref, AO2_STRING_FIELD_SORT_FN(), ao2_unlock, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, stasis_subscription_statistics::file, stasis_subscription_statistics::func, stasis_subscription_statistics::highest_time_invoked, stasis_subscription_statistics::highest_time_message_type, stasis_subscription_statistics::lineno, stasis_subscription_statistics::lowest_time_invoked, stasis_subscription_statistics::messages_dropped, stasis_subscription_statistics::messages_passed, ast_cli_args::n, name, NULL, OBJ_SEARCH_KEY, ast_cli_args::pos, S_OR, stasis_message_type_name(), statistics(), stasis_subscription_statistics::sub, subscription_statistics_complete_name(), stasis_subscription_statistics::topics, stasis_subscription_statistics::uniqueid, ast_cli_entry::usage, stasis_subscription_statistics::uses_mailbox, stasis_subscription_statistics::uses_threadpool, and ast_cli_args::word.

2605 {
2607  struct ao2_container *subscription_stats;
2608  struct ao2_iterator i;
2609  char *name;
2610 
2611  switch (cmd) {
2612  case CLI_INIT:
2613  e->command = "stasis statistics show subscription";
2614  e->usage =
2615  "Usage: stasis statistics show subscription <uniqueid>\n"
2616  " Show stasis subscription statistics.\n";
2617  return NULL;
2618  case CLI_GENERATE:
2619  if (a->pos == 4) {
2621  } else {
2622  return NULL;
2623  }
2624  }
2625 
2626  if (a->argc != 5) {
2627  return CLI_SHOWUSAGE;
2628  }
2629 
2630  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2631  if (!subscription_stats) {
2632  ast_cli(a->fd, "Could not fetch subcription_statistics container\n");
2633  return CLI_FAILURE;
2634  }
2635 
2636  statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2637  if (!statistics) {
2638  ao2_ref(subscription_stats, -1);
2639  ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2640  return CLI_FAILURE;
2641  }
2642 
2643  ao2_ref(subscription_stats, -1);
2644 
2645  ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2646  ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2647  ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2648  ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2649  ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2650  ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2651  ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2652  ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2653  ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2654  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2655  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2656 
2657  ao2_lock(statistics);
2658  if (statistics->highest_time_message_type) {
2659  ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2660  }
2661  ao2_unlock(statistics);
2662 
2663  ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2664 
2665  ast_cli(a->fd, "Subscribed topics:\n");
2666  i = ao2_iterator_init(statistics->topics, 0);
2667  while ((name = ao2_iterator_next(&i))) {
2668  ast_cli(a->fd, "\t%s\n", name);
2669  ao2_ref(name, -1);
2670  }
2672 
2673  ao2_ref(statistics, -1);
2674 
2675  return CLI_SUCCESS;
2676 }
static void statistics(void)
Definition: utils/frame.c:287
static char * subscription_statistics_complete_name(const char *word, int state)
Definition: stasis.c:2570
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
const char * func
The function where the subscription originates.
Definition: stasis.c:654
const int argc
Definition: cli.h:160
Definition: cli.h:152
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
long highest_time_invoked
Highest time spent invoking a message.
Definition: stasis.c:660
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
int lineno
The line number where the subscription originates.
Definition: stasis.c:672
const int fd
Definition: cli.h:159
const int n
Definition: cli.h:165
const char * file
The filename where the subscription originates.
Definition: stasis.c:652
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
int messages_passed
The number of messages that passed filtering.
Definition: stasis.c:666
struct ao2_container * topics
Names of the topics we are subscribed to.
Definition: stasis.c:656
const char *const * argv
Definition: cli.h:161
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
int uses_mailbox
Using a mailbox to queue messages.
Definition: stasis.c:668
int uses_threadpool
Using stasis threadpool for handling messages.
Definition: stasis.c:670
#define CLI_FAILURE
Definition: cli.h:46
static const char name[]
Definition: cdr_mysql.c:74
char * command
Definition: cli.h:186
const char * word
Definition: cli.h:163
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
struct stasis_message_type * highest_time_message_type
The message type that currently took the longest to process.
Definition: stasis.c:658
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
Definition: strings.h:79
const int pos
Definition: cli.h:164
char uniqueid[0]
Unique ID of the subscription.
Definition: stasis.c:676
struct stasis_subscription * sub
Pointer to the subscription (NOT refcounted, and must NOT be accessed)
Definition: stasis.c:674
Generic container type.
long lowest_time_invoked
Lowest time spent invoking a message.
Definition: stasis.c:662
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
int messages_dropped
The number of messages that were filtered out.
Definition: stasis.c:664

◆ statistics_show_subscriptions()

static char* statistics_show_subscriptions ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2490 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_container_alloc_rbtree, ao2_container_dup(), ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_FIELDS, FMT_FIELDS2, FMT_HEADERS, stasis_subscription_statistics::highest_time_invoked, stasis_subscription_statistics::lowest_time_invoked, stasis_subscription_statistics::messages_dropped, stasis_subscription_statistics::messages_passed, NULL, statistics(), stasis_subscription_statistics::uniqueid, and ast_cli_entry::usage.

2491 {
2492  struct ao2_container *sorted_subscriptions;
2493  struct ao2_container *subscription_stats;
2494  struct ao2_iterator iter;
2496  int count = 0;
2497  int dropped = 0;
2498  int passed = 0;
2499 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2500 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2501 #define FMT_FIELDS2 "%-64s %10d %10d\n"
2502 
2503  switch (cmd) {
2504  case CLI_INIT:
2505  e->command = "stasis statistics show subscriptions";
2506  e->usage =
2507  "Usage: stasis statistics show subscriptions\n"
2508  " Shows a list of subscriptions and their general statistics\n";
2509  return NULL;
2510  case CLI_GENERATE:
2511  return NULL;
2512  }
2513 
2514  if (a->argc != e->args) {
2515  return CLI_SHOWUSAGE;
2516  }
2517 
2518  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2519  if (!subscription_stats) {
2520  ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2521  return CLI_FAILURE;
2522  }
2523 
2524  sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2525  stasis_subscription_statistics_sort_fn, NULL);
2526  if (!sorted_subscriptions) {
2527  ao2_ref(subscription_stats, -1);
2528  ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2529  return CLI_SUCCESS;
2530  }
2531 
2532  if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2533  ao2_ref(sorted_subscriptions, -1);
2534  ao2_ref(subscription_stats, -1);
2535  ast_cli(a->fd, "Could not sort subscription statistics\n");
2536  return CLI_SUCCESS;
2537  }
2538 
2539  ao2_ref(subscription_stats, -1);
2540 
2541  ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2542 
2543  iter = ao2_iterator_init(sorted_subscriptions, 0);
2544  while ((statistics = ao2_iterator_next(&iter))) {
2545  ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2546  statistics->lowest_time_invoked, statistics->highest_time_invoked);
2547  dropped += statistics->messages_dropped;
2548  passed += statistics->messages_passed;
2549  ao2_ref(statistics, -1);
2550  ++count;
2551  }
2552  ao2_iterator_destroy(&iter);
2553 
2554  ao2_ref(sorted_subscriptions, -1);
2555 
2556  ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2557  ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2558 
2559 #undef FMT_HEADERS
2560 #undef FMT_FIELDS
2561 #undef FMT_FIELDS2
2562 
2563  return CLI_SUCCESS;
2564 }
static void statistics(void)
Definition: utils/frame.c:287
#define FMT_FIELDS
const int argc
Definition: cli.h:160
Definition: cli.h:152
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
long highest_time_invoked
Highest time spent invoking a message.
Definition: stasis.c:660
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
int args
This gets set in ast_cli_register()
Definition: cli.h:185
const int fd
Definition: cli.h:159
#define ao2_ref(o, delta)
Definition: astobj2.h:464
int messages_passed
The number of messages that passed filtering.
Definition: stasis.c:666
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define FMT_FIELDS2
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define CLI_FAILURE
Definition: cli.h:46
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
char uniqueid[0]
Unique ID of the subscription.
Definition: stasis.c:676
#define FMT_HEADERS
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1358
Generic container type.
long lowest_time_invoked
Lowest time spent invoking a message.
Definition: stasis.c:662
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
int messages_dropped
The number of messages that were filtered out.
Definition: stasis.c:664

◆ statistics_show_topic()

static char* statistics_show_topic ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2799 of file stasis.c.

References ao2_container_count(), ao2_find, ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_args::argv, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, ast_cli_args::n, NULL, OBJ_SEARCH_KEY, ast_cli_args::pos, statistics(), topic_statistics_complete_name(), ast_cli_entry::usage, and ast_cli_args::word.

2800 {
2801  struct stasis_topic_statistics *statistics;
2802  struct ao2_container *topic_stats;
2803  struct ao2_iterator i;
2804  char *uniqueid;
2805 
2806  switch (cmd) {
2807  case CLI_INIT:
2808  e->command = "stasis statistics show topic";
2809  e->usage =
2810  "Usage: stasis statistics show topic <name>\n"
2811  " Show stasis topic statistics.\n";
2812  return NULL;
2813  case CLI_GENERATE:
2814  if (a->pos == 4) {
2815  return topic_statistics_complete_name(a->word, a->n);
2816  } else {
2817  return NULL;
2818  }
2819  }
2820 
2821  if (a->argc != 5) {
2822  return CLI_SHOWUSAGE;
2823  }
2824 
2825  topic_stats = ao2_global_obj_ref(topic_statistics);
2826  if (!topic_stats) {
2827  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2828  return CLI_FAILURE;
2829  }
2830 
2831  statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2832  if (!statistics) {
2833  ao2_ref(topic_stats, -1);
2834  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2835  return CLI_FAILURE;
2836  }
2837 
2838  ao2_ref(topic_stats, -1);
2839 
2840  ast_cli(a->fd, "Topic: %s\n", statistics->name);
2841  ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2842  ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2843  ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2844  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2845  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2846  ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2847 
2848  ast_cli(a->fd, "Subscribers:\n");
2849  i = ao2_iterator_init(statistics->subscribers, 0);
2850  while ((uniqueid = ao2_iterator_next(&i))) {
2851  ast_cli(a->fd, "\t%s\n", uniqueid);
2852  ao2_ref(uniqueid, -1);
2853  }
2855 
2856  ao2_ref(statistics, -1);
2857 
2858  return CLI_SUCCESS;
2859 }
static void statistics(void)
Definition: utils/frame.c:287
static char * topic_statistics_complete_name(const char *word, int state)
Definition: stasis.c:2765
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
const int argc
Definition: cli.h:160
Definition: cli.h:152
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
const int fd
Definition: cli.h:159
const int n
Definition: cli.h:165
#define ao2_ref(o, delta)
Definition: astobj2.h:464
const char *const * argv
Definition: cli.h:161
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define CLI_FAILURE
Definition: cli.h:46
char * command
Definition: cli.h:186
const char * word
Definition: cli.h:163
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
const int pos
Definition: cli.h:164
Generic container type.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.

◆ statistics_show_topics()

static char* statistics_show_topics ( struct ast_cli_entry e,
int  cmd,
struct ast_cli_args a 
)
static

Definition at line 2684 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_container_alloc_rbtree, ao2_container_count(), ao2_container_dup(), ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_cli_args::argc, ast_cli_entry::args, ast_cli(), CLI_FAILURE, CLI_GENERATE, CLI_INIT, CLI_SHOWUSAGE, CLI_SUCCESS, ast_cli_entry::command, ast_cli_args::fd, FMT_FIELDS, FMT_FIELDS2, FMT_HEADERS, NULL, statistics(), and ast_cli_entry::usage.

2685 {
2686  struct ao2_container *sorted_topics;
2687  struct ao2_container *topic_stats;
2688  struct ao2_iterator iter;
2689  struct stasis_topic_statistics *statistics;
2690  int count = 0;
2691  int not_dispatched = 0;
2692  int dispatched = 0;
2693 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2694 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2695 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2696 
2697  switch (cmd) {
2698  case CLI_INIT:
2699  e->command = "stasis statistics show topics";
2700  e->usage =
2701  "Usage: stasis statistics show topics\n"
2702  " Shows a list of topics and their general statistics\n";
2703  return NULL;
2704  case CLI_GENERATE:
2705  return NULL;
2706  }
2707 
2708  if (a->argc != e->args) {
2709  return CLI_SHOWUSAGE;
2710  }
2711 
2712  topic_stats = ao2_global_obj_ref(topic_statistics);
2713  if (!topic_stats) {
2714  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2715  return CLI_FAILURE;
2716  }
2717 
2719  stasis_topic_statistics_sort_fn, NULL);
2720  if (!sorted_topics) {
2721  ao2_ref(topic_stats, -1);
2722  ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2723  return CLI_SUCCESS;
2724  }
2725 
2726  if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2727  ao2_ref(sorted_topics, -1);
2728  ao2_ref(topic_stats, -1);
2729  ast_cli(a->fd, "Could not sort topic statistics\n");
2730  return CLI_SUCCESS;
2731  }
2732 
2733  ao2_ref(topic_stats, -1);
2734 
2735  ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2736 
2737  iter = ao2_iterator_init(sorted_topics, 0);
2738  while ((statistics = ao2_iterator_next(&iter))) {
2739  ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2740  statistics->messages_not_dispatched, statistics->messages_dispatched,
2741  statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2742  not_dispatched += statistics->messages_not_dispatched;
2743  dispatched += statistics->messages_dispatched;
2744  ao2_ref(statistics, -1);
2745  ++count;
2746  }
2747  ao2_iterator_destroy(&iter);
2748 
2749  ao2_ref(sorted_topics, -1);
2750 
2751  ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2752  ast_cli(a->fd, "\n%d topics\n\n", count);
2753 
2754 #undef FMT_HEADERS
2755 #undef FMT_FIELDS
2756 #undef FMT_FIELDS2
2757 
2758  return CLI_SUCCESS;
2759 }
static void statistics(void)
Definition: utils/frame.c:287
#define FMT_FIELDS
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
const int argc
Definition: cli.h:160
Definition: cli.h:152
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define NULL
Definition: resample.c:96
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
int args
This gets set in ast_cli_register()
Definition: cli.h:185
const int fd
Definition: cli.h:159
#define ao2_ref(o, delta)
Definition: astobj2.h:464
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
#define CLI_SHOWUSAGE
Definition: cli.h:45
#define FMT_FIELDS2
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define CLI_FAILURE
Definition: cli.h:46
char * command
Definition: cli.h:186
const char * usage
Definition: cli.h:177
#define CLI_SUCCESS
Definition: cli.h:44
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define FMT_HEADERS
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1358
Generic container type.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.

◆ sub_cleanup()

static int sub_cleanup ( void *  data)
static

Definition at line 966 of file stasis.c.

References ao2_cleanup, and stasis_subscription::data.

Referenced by stasis_unsubscribe().

967 {
968  struct stasis_subscription *sub = data;
969  ao2_cleanup(sub);
970  return 0;
971 }
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * sub
Definition: res_corosync.c:240

◆ subscription_change_alloc()

static struct stasis_subscription_change* subscription_change_alloc ( struct stasis_topic topic,
const char *  uniqueid,
const char *  description 
)
static

Definition at line 1627 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_ref, ast_copy_string(), stasis_subscription_change::description, NULL, subscription_change_dtor(), stasis_subscription_change::topic, and stasis_subscription_change::uniqueid.

Referenced by send_subscription_subscribe(), and send_subscription_unsubscribe().

1628 {
1629  size_t description_len = strlen(description) + 1;
1630  size_t uniqueid_len = strlen(uniqueid) + 1;
1631  struct stasis_subscription_change *change;
1632 
1633  change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1635  if (!change) {
1636  return NULL;
1637  }
1638 
1639  strcpy(change->description, description); /* SAFE */
1640  change->uniqueid = change->description + description_len;
1641  ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1642  ao2_ref(topic, +1);
1643  change->topic = topic;
1644 
1645  return change;
1646 }
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
static void subscription_change_dtor(void *obj)
Definition: stasis.c:1620
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * topic
Definition: stasis.h:893
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:401

◆ subscription_change_dtor()

static void subscription_change_dtor ( void *  obj)
static

Definition at line 1620 of file stasis.c.

References ao2_cleanup, and stasis_subscription_change::topic.

Referenced by subscription_change_alloc().

1621 {
1622  struct stasis_subscription_change *change = obj;
1623 
1624  ao2_cleanup(change->topic);
1625 }
struct stasis_topic * topic
Definition: stasis.h:893
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ subscription_dtor()

static void subscription_dtor ( void *  obj)
static

Definition at line 715 of file stasis.c.

References ao2_cleanup, ao2_global_obj_ref, ao2_ref, ao2_unlink, ast_assert, ast_cond_destroy, ast_free, ast_taskprocessor_unreference(), AST_VECTOR_FREE, stasis_subscription::join_cond, stasis_subscription::mailbox, NULL, stasis_subscription_is_done(), stasis_subscription_is_subscribed(), stasis_subscription::topic, and stasis_subscription::uniqueid.

Referenced by internal_stasis_subscribe().

716 {
717  struct stasis_subscription *sub = obj;
718 #ifdef AST_DEVMODE
719  struct ao2_container *subscription_stats;
720 #endif
721 
722  /* Subscriptions need to be manually unsubscribed before destruction
723  * b/c there's a cyclic reference between topics and subscriptions */
725  /* If there are any messages in flight to this subscription; that would
726  * be bad. */
728 
729  ast_free(sub->uniqueid);
730  ao2_cleanup(sub->topic);
731  sub->topic = NULL;
733  sub->mailbox = NULL;
735 
736  AST_VECTOR_FREE(&sub->accepted_message_types);
737 
738 #ifdef AST_DEVMODE
739  if (sub->statistics) {
740  subscription_stats = ao2_global_obj_ref(subscription_statistics);
741  if (subscription_stats) {
742  ao2_unlink(subscription_stats, sub->statistics);
743  ao2_ref(subscription_stats, -1);
744  }
745  ao2_ref(sub->statistics, -1);
746  }
747 #endif
748 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
struct stasis_topic * topic
Definition: stasis.c:685
ast_cond_t join_cond
Definition: stasis.c:694
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct ast_taskprocessor * mailbox
Definition: stasis.c:687
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ast_free(a)
Definition: astmm.h:182
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct stasis_forward * sub
Definition: res_corosync.c:240
Generic container type.
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152

◆ subscription_invoke()

static void subscription_invoke ( struct stasis_subscription sub,
struct stasis_message message 
)
static

Invoke the subscription's callback.

Parameters
subSubscription to invoke.
topicTopic message was published to.
messageMessage to send.

Definition at line 756 of file stasis.c.

References ao2_lock, ao2_unlock, ast_cond_signal, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::final_message_processed, stasis_subscription::final_message_rxed, stasis_subscription::join_cond, message_type_id, send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_message_type(), stasis_message_type_id(), stasis_subscription_change_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_final_message(), and statistics().

Referenced by dispatch_exec_async(), dispatch_exec_sync(), and dispatch_message().

758 {
759  unsigned int final = stasis_subscription_final_message(sub, message);
761 #ifdef AST_DEVMODE
762  struct timeval start;
763  long elapsed;
764 
765  start = ast_tvnow();
766 #endif
767 
768  /* Notify that the final message has been received */
769  if (final) {
770  ao2_lock(sub);
771  sub->final_message_rxed = 1;
772  ast_cond_signal(&sub->join_cond);
773  ao2_unlock(sub);
774  }
775 
776  /*
777  * If filtering is turned on and this is a 'final' message, we only invoke the callback
778  * if the subscriber accepts subscription_change message types.
779  */
780  if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
781  (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
782  /* Since sub is mostly immutable, no need to lock sub */
783  sub->callback(sub->data, sub, message);
784  }
785 
786  /* Notify that the final message has been processed */
787  if (final) {
788  ao2_lock(sub);
789  sub->final_message_processed = 1;
790  ast_cond_signal(&sub->join_cond);
791  ao2_unlock(sub);
792  }
793 
794 #ifdef AST_DEVMODE
795  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
796  if (elapsed > sub->statistics->highest_time_invoked) {
797  sub->statistics->highest_time_invoked = elapsed;
798  ao2_lock(sub->statistics);
799  sub->statistics->highest_time_message_type = stasis_message_type(message);
800  ao2_unlock(sub->statistics);
801  }
802  if (elapsed < sub->statistics->lowest_time_invoked) {
803  sub->statistics->lowest_time_invoked = elapsed;
804  }
805 #endif
806 }
static void statistics(void)
Definition: utils/frame.c:287
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
static int message_type_id
ast_cond_t join_cond
Definition: stasis.c:694
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ao2_unlock(a)
Definition: astobj2.h:730
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:98
#define ast_cond_signal(cond)
Definition: lock.h:201
int final_message_processed
Definition: stasis.c:700
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
#define ao2_lock(a)
Definition: astobj2.h:718
int final_message_rxed
Definition: stasis.c:697
stasis_subscription_cb callback
Definition: stasis.c:689
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ subscription_statistics_cmp()

static int subscription_statistics_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 2945 of file stasis.c.

References ast_assert, CMP_MATCH, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, OBJ_SEARCH_PARTIAL_KEY, and stasis_subscription_statistics::uniqueid.

Referenced by stasis_init().

2946 {
2947  const struct stasis_subscription_statistics *object_left = obj;
2948  const struct stasis_subscription_statistics *object_right = arg;
2949  const char *right_key = arg;
2950  int cmp;
2951 
2952  switch (flags & OBJ_SEARCH_MASK) {
2953  case OBJ_SEARCH_OBJECT:
2954  right_key = object_right->uniqueid;
2955  /* Fall through */
2956  case OBJ_SEARCH_KEY:
2957  cmp = strcasecmp(object_left->uniqueid, right_key);
2958  break;
2960  /* Not supported by container */
2961  ast_assert(0);
2962  cmp = -1;
2963  break;
2964  default:
2965  /*
2966  * What arg points to is specific to this traversal callback
2967  * and has no special meaning to astobj2.
2968  */
2969  cmp = 0;
2970  break;
2971  }
2972  if (cmp) {
2973  return 0;
2974  }
2975  /*
2976  * At this point the traversal callback is identical to a sorted
2977  * container.
2978  */
2979  return CMP_MATCH;
2980 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ast_assert(a)
Definition: utils.h:695
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
char uniqueid[0]
Unique ID of the subscription.
Definition: stasis.c:676
Search option field mask.
Definition: astobj2.h:1076

◆ subscription_statistics_complete_name()

static char* subscription_statistics_complete_name ( const char *  word,
int  state 
)
static

Definition at line 2570 of file stasis.c.

References ao2_global_obj_ref, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, ast_strdup, NULL, result, statistics(), and stasis_subscription_statistics::uniqueid.

Referenced by statistics_show_subscription().

2571 {
2573  struct ao2_container *subscription_stats;
2574  struct ao2_iterator it_statistics;
2575  int wordlen = strlen(word);
2576  int which = 0;
2577  char *result = NULL;
2578 
2579  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2580  if (!subscription_stats) {
2581  return result;
2582  }
2583 
2584  it_statistics = ao2_iterator_init(subscription_stats, 0);
2585  while ((statistics = ao2_iterator_next(&it_statistics))) {
2586  if (!strncasecmp(word, statistics->uniqueid, wordlen)
2587  && ++which > state) {
2588  result = ast_strdup(statistics->uniqueid);
2589  }
2590  ao2_ref(statistics, -1);
2591  if (result) {
2592  break;
2593  }
2594  }
2595  ao2_iterator_destroy(&it_statistics);
2596  ao2_ref(subscription_stats, -1);
2597  return result;
2598 }
static void statistics(void)
Definition: utils/frame.c:287
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
static PGresult * result
Definition: cel_pgsql.c:88
char uniqueid[0]
Unique ID of the subscription.
Definition: stasis.c:676
Generic container type.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
short word

◆ subscription_statistics_destroy()

static void subscription_statistics_destroy ( void *  obj)
static

Definition at line 816 of file stasis.c.

References ao2_cleanup, statistics(), and stasis_subscription_statistics::topics.

Referenced by stasis_subscription_statistics_create().

817 {
819 
820  ao2_cleanup(statistics->topics);
821 }
static void statistics(void)
Definition: utils/frame.c:287
struct ao2_container * topics
Names of the topics we are subscribed to.
Definition: stasis.c:656
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ subscription_statistics_hash()

static int subscription_statistics_hash ( const void *  obj,
const int  flags 
)
static

Definition at line 2924 of file stasis.c.

References ast_assert, ast_str_case_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by stasis_init().

2925 {
2926  const struct stasis_subscription_statistics *object;
2927  const char *key;
2928 
2929  switch (flags & OBJ_SEARCH_MASK) {
2930  case OBJ_SEARCH_KEY:
2931  key = obj;
2932  break;
2933  case OBJ_SEARCH_OBJECT:
2934  object = obj;
2935  key = object->uniqueid;
2936  break;
2937  default:
2938  /* Hash can only work on something with a full key. */
2939  ast_assert(0);
2940  return 0;
2941  }
2942  return ast_str_case_hash(key);
2943 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ast_assert(a)
Definition: utils.h:695
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
Search option field mask.
Definition: astobj2.h:1076
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1250

◆ topic_add_subscription()

static int topic_add_subscription ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Add a subscriber to a topic.

Parameters
topicTopic
subSubscriber
Returns
0 on success
Non-zero on error

Definition at line 1203 of file stasis.c.

References ao2_lock, ao2_unlock, ast_str_container_add(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), and stasis_topic_name().

Referenced by internal_stasis_subscribe(), proxy_dtor(), and stasis_forward_all().

1204 {
1205  size_t idx;
1206 
1207  ao2_lock(topic);
1208  /* The reference from the topic to the subscription is shared with
1209  * the owner of the subscription, which will explicitly unsubscribe
1210  * to release it.
1211  *
1212  * If we bumped the refcount here, the owner would have to unsubscribe
1213  * and cleanup, which is a bit awkward. */
1214  AST_VECTOR_APPEND(&topic->subscribers, sub);
1215 
1216  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1218  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1219  }
1220 
1221 #ifdef AST_DEVMODE
1222  ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
1223  ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1224 #endif
1225 
1226  ao2_unlock(topic);
1227 
1228  return 0;
1229 }
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1171
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define ao2_unlock(a)
Definition: