Asterisk - The Open Source Telephony Project  18.5.0
Functions
stasis_internal.h File Reference

Internal Stasis APIs. More...

#include "asterisk/stasis.h"
Include dependency graph for stasis_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

struct stasis_subscriptioninternal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
 Create a subscription. More...
 

Detailed Description

Internal Stasis APIs.

This header file is used to define functions that are shared between files that make up Stasis Message Bus API. Functions declared here should not be used by any module outside of Stasis.

If you find yourself needing to call one of these functions directly, something has probably gone horribly wrong.

Author
Matt Jordan mjord.nosp@m.an@d.nosp@m.igium.nosp@m..com

Definition in file stasis_internal.h.

Function Documentation

◆ internal_stasis_subscribe()

struct stasis_subscription* internal_stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
int  needs_mailbox,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Note: modules outside of Stasis should use stasis_subscribe.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
needs_mailboxDetermines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread.
use_thread_poolUse the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero.
Returns
New stasis_subscription object.
NULL on error.
Since
12

Definition at line 858 of file stasis.c.

References ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_cond_init, ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), ast_threadpool_serializer(), AST_VECTOR_INIT, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::join_cond, stasis_subscription::mailbox, NULL, send_subscription_subscribe(), STASIS_SUBSCRIPTION_FILTER_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_subscription_statistics_create(), stasis_topic_name(), statistics(), sub, subscription_dtor(), stasis_subscription::topic, topic_add_subscription(), TPS_REF_DEFAULT, and stasis_subscription::uniqueid.

Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().

867 {
868  struct stasis_subscription *sub;
869  int ret;
870 
871  if (!topic) {
872  return NULL;
873  }
874 
875  /* The ao2 lock is used for join_cond. */
876  sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
877  if (!sub) {
878  return NULL;
879  }
880 
881 #ifdef AST_DEVMODE
882  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
883  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
884  if (ret < 0 || !sub->statistics) {
885  ao2_ref(sub, -1);
886  return NULL;
887  }
888 #else
889  ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
890  if (ret < 0) {
891  ao2_ref(sub, -1);
892  return NULL;
893  }
894 #endif
895 
896  if (needs_mailbox) {
897  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
898 
899  /* Create name with seq number appended. */
900  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
901  use_thread_pool ? 'p' : 'm',
902  stasis_topic_name(topic));
903 
904  /*
905  * With a small number of subscribers, a thread-per-sub is
906  * acceptable. For a large number of subscribers, a thread
907  * pool should be used.
908  */
909  if (use_thread_pool) {
910  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
911  } else {
913  }
914  if (!sub->mailbox) {
915  ao2_ref(sub, -1);
916 
917  return NULL;
918  }
920  /* Taskprocessor has a reference */
921  ao2_ref(sub, +1);
922  }
923 
924  ao2_ref(topic, +1);
925  sub->topic = topic;
926  sub->callback = callback;
927  sub->data = data;
928  ast_cond_init(&sub->join_cond, NULL);
929  sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
930  AST_VECTOR_INIT(&sub->accepted_message_types, 0);
931  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
932 
933  if (topic_add_subscription(topic, sub) != 0) {
934  ao2_ref(sub, -1);
935  ao2_ref(topic, -1);
936 
937  return NULL;
938  }
939  send_subscription_subscribe(topic, sub);
940 
941  return sub;
942 }
static void statistics(void)
Definition: utils/frame.c:287
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct stasis_topic * topic
Definition: stasis.c:685
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1432
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
ast_cond_t join_cond
Definition: stasis.c:694
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
#define ast_cond_init(cond, attr)
Definition: lock.h:199
#define NULL
Definition: resample.c:96
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:60
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
Definition: stasis.c:1648
#define ao2_ref(o, delta)
Definition: astobj2.h:464
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
struct ast_taskprocessor * mailbox
Definition: stasis.c:687
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1203
stasis_subscription_cb callback
Definition: stasis.c:689
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
static struct stasis_subscription_statistics * stasis_subscription_statistics_create(struct stasis_subscription *sub, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Definition: stasis.c:823
struct stasis_forward * sub
Definition: res_corosync.c:240
static void subscription_dtor(void *obj)
Definition: stasis.c:715
static struct ast_threadpool * threadpool
Definition: stasis.c:308