Asterisk - The Open Source Telephony Project  18.5.0
Data Structures | Macros | Functions
stasis_message_router.c File Reference

Stasis message router implementation. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/vector.h"
Include dependency graph for stasis_message_router.c:

Go to the source code of this file.

Data Structures

struct  stasis_message_route
 
struct  stasis_message_router
 

Macros

#define ROUTE_TABLE_ELEM_CLEANUP(elem)   ao2_cleanup((elem).message_type)
 route_table vector element cleanup. More...
 
#define ROUTE_TABLE_ELEM_CMP(elem, value)   ((elem).message_type == (value))
 route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED() More...
 

Functions

struct stasis_message_router__stasis_message_router_create (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 Create a new message router object. More...
 
struct stasis_message_router__stasis_message_router_create_pool (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 Create a new message router object. More...
 
 AST_VECTOR (route_table, struct stasis_message_route)
 
static int find_route (struct stasis_message_router *router, struct stasis_message *message, struct stasis_message_route *route_out)
 
static int route_table_add (struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 
static void route_table_dtor (struct route_table *table)
 
static struct stasis_message_routeroute_table_find (struct route_table *table, struct stasis_message_type *message_type)
 
static int route_table_remove (struct route_table *table, struct stasis_message_type *message_type)
 
static void router_dispatch (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
static void router_dtor (void *obj)
 
void stasis_message_router_accept_formatters (struct stasis_message_router *router, enum stasis_subscription_message_formatters formatters)
 Indicate to a message router that we are interested in messages with one or more formatters. More...
 
int stasis_message_router_add (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route to a message router. More...
 
int stasis_message_router_add_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route for stasis_cache_update messages to a message router. More...
 
static struct stasis_message_routerstasis_message_router_create_internal (struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)
 
int stasis_message_router_is_done (struct stasis_message_router *router)
 Returns whether router has received its final message. More...
 
void stasis_message_router_publish_sync (struct stasis_message_router *router, struct stasis_message *message)
 Publish a message to a message router's subscription synchronously. More...
 
void stasis_message_router_remove (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a route from a message router. More...
 
void stasis_message_router_remove_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a cache route from a message router. More...
 
int stasis_message_router_set_congestion_limits (struct stasis_message_router *router, long low_water, long high_water)
 Set the high and low alert water marks of the stasis message router. More...
 
int stasis_message_router_set_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
 Sets the default route of a router. More...
 
void stasis_message_router_set_formatters_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
 Sets the default route of a router with formatters. More...
 
void stasis_message_router_unsubscribe (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic. More...
 
void stasis_message_router_unsubscribe_and_join (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic, blocking until the final message has been processed. More...
 

Detailed Description

Stasis message router implementation.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis_message_router.c.

Macro Definition Documentation

◆ ROUTE_TABLE_ELEM_CLEANUP

#define ROUTE_TABLE_ELEM_CLEANUP (   elem)    ao2_cleanup((elem).message_type)

route_table vector element cleanup.

Parameters
elemElement to cleanup
Returns
Nothing

Definition at line 88 of file stasis_message_router.c.

Referenced by route_table_add(), route_table_dtor(), and route_table_remove().

◆ ROUTE_TABLE_ELEM_CMP

#define ROUTE_TABLE_ELEM_CMP (   elem,
  value 
)    ((elem).message_type == (value))

route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()

Parameters
elemElement to compare against
valueValue to compare with the vector element.
Returns
0 if element does not match.
Non-zero if element matches.

Definition at line 79 of file stasis_message_router.c.

Referenced by route_table_remove().

Function Documentation

◆ __stasis_message_router_create()

struct stasis_message_router* __stasis_message_router_create ( struct stasis_topic topic,
const char *  file,
int  lineno,
const char *  func 
)

Create a new message router object.

Parameters
topicTopic to subscribe route to.
Returns
New stasis_message_router.
NULL on error.
Since
12

Definition at line 246 of file stasis_message_router.c.

References stasis_message_router_create_internal().

248 {
249  return stasis_message_router_create_internal(topic, 0, file, lineno, func);
250 }
static struct stasis_message_router * stasis_message_router_create_internal(struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)

◆ __stasis_message_router_create_pool()

struct stasis_message_router* __stasis_message_router_create_pool ( struct stasis_topic topic,
const char *  file,
int  lineno,
const char *  func 
)

Create a new message router object.

The subscription created for this message router will dispatch callbacks on a thread pool.

Parameters
topicTopic to subscribe route to.
Returns
New stasis_message_router.
NULL on error.
Since
12.8.0

Definition at line 252 of file stasis_message_router.c.

References stasis_message_router_create_internal().

254 {
255  return stasis_message_router_create_internal(topic, 1, file, lineno, func);
256 }
static struct stasis_message_router * stasis_message_router_create_internal(struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)

◆ AST_VECTOR()

AST_VECTOR ( route_table  ,
struct stasis_message_route   
)

◆ find_route()

static int find_route ( struct stasis_message_router router,
struct stasis_message message,
struct stasis_message_route route_out 
)
static

Definition at line 155 of file stasis_message_router.c.

References ast_assert, stasis_message_router::cache_routes, stasis_message_route::callback, stasis_message_router::default_route, lock, NULL, route_table_find(), stasis_message_router::routes, SCOPED_AO2LOCK, stasis_cache_update_type(), stasis_message_data(), stasis_message_type(), type, stasis_cache_update::type, and update().

Referenced by router_dispatch().

159 {
160  struct stasis_message_route *route = NULL;
161  struct stasis_message_type *type = stasis_message_type(message);
162  SCOPED_AO2LOCK(lock, router);
163 
164  ast_assert(route_out != NULL);
165 
166  if (type == stasis_cache_update_type()) {
167  /* Find a cache route */
168  struct stasis_cache_update *update =
169  stasis_message_data(message);
170  route = route_table_find(&router->cache_routes, update->type);
171  }
172 
173  if (route == NULL) {
174  /* Find a regular route */
175  route = route_table_find(&router->routes, type);
176  }
177 
178  if (route == NULL && router->default_route.callback) {
179  /* Maybe the default route, then? */
180  route = &router->default_route;
181  }
182 
183  if (!route) {
184  return -1;
185  }
186 
187  *route_out = *route;
188  return 0;
189 }
static const char type[]
Definition: chan_ooh323.c:109
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
struct route_table cache_routes
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
ast_mutex_t lock
Definition: app_meetme.c:1091
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
Cache update message.
Definition: stasis.h:967
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message_route default_route
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
static struct stasis_message_route * route_table_find(struct route_table *table, struct stasis_message_type *message_type)
stasis_subscription_cb callback

◆ route_table_add()

static int route_table_add ( struct route_table *  table,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)
static

Definition at line 97 of file stasis_message_router.c.

References ao2_bump, ast_assert, AST_VECTOR_APPEND, stasis_message_route::callback, stasis_message_route::data, stasis_message_route::message_type, NULL, ROUTE_TABLE_ELEM_CLEANUP, and route_table_find().

Referenced by stasis_message_router_add(), and stasis_message_router_add_cache_update().

100 {
101  struct stasis_message_route route;
102  int res;
103 
105  ast_assert(route_table_find(table, message_type) == NULL);
106 
107  route.message_type = ao2_bump(message_type);
108  route.callback = callback;
109  route.data = data;
110 
111  res = AST_VECTOR_APPEND(table, route);
112  if (res) {
114  }
115  return res;
116 }
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define ao2_bump(obj)
Definition: astobj2.h:491
static char * table
Definition: cdr_odbc.c:58
#define ROUTE_TABLE_ELEM_CLEANUP(elem)
route_table vector element cleanup.
static struct stasis_message_route * route_table_find(struct route_table *table, struct stasis_message_type *message_type)
stasis_subscription_cb callback

◆ route_table_dtor()

static void route_table_dtor ( struct route_table *  table)
static

Definition at line 118 of file stasis_message_router.c.

References AST_VECTOR_FREE, AST_VECTOR_GET_ADDR, AST_VECTOR_SIZE, and ROUTE_TABLE_ELEM_CLEANUP.

Referenced by router_dtor().

119 {
120  size_t idx;
121  struct stasis_message_route *route;
122 
123  for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
124  route = AST_VECTOR_GET_ADDR(table, idx);
125  ROUTE_TABLE_ELEM_CLEANUP(*route);
126  }
128 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
static char * table
Definition: cdr_odbc.c:58
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
#define ROUTE_TABLE_ELEM_CLEANUP(elem)
route_table vector element cleanup.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ route_table_find()

static struct stasis_message_route* route_table_find ( struct route_table *  table,
struct stasis_message_type message_type 
)
static

Definition at line 48 of file stasis_message_router.c.

References AST_VECTOR_GET_ADDR, AST_VECTOR_SIZE, stasis_message_route::message_type, and NULL.

Referenced by find_route(), and route_table_add().

50 {
51  size_t idx;
52  struct stasis_message_route *route;
53 
54  /* While a linear search for routes may seem very inefficient, most
55  * route tables have six routes or less. For such small data, it's
56  * hard to beat a linear search. If we start having larger route
57  * tables, then we can look into containers with more efficient
58  * lookups.
59  */
60  for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
61  route = AST_VECTOR_GET_ADDR(table, idx);
62  if (route->message_type == message_type) {
63  return route;
64  }
65  }
66 
67  return NULL;
68 }
#define NULL
Definition: resample.c:96
static char * table
Definition: cdr_odbc.c:58
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
struct stasis_message_type * message_type
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ route_table_remove()

static int route_table_remove ( struct route_table *  table,
struct stasis_message_type message_type 
)
static

Definition at line 90 of file stasis_message_router.c.

References AST_VECTOR_REMOVE_CMP_UNORDERED, ROUTE_TABLE_ELEM_CLEANUP, and ROUTE_TABLE_ELEM_CMP.

Referenced by stasis_message_router_remove(), and stasis_message_router_remove_cache_update().

92 {
95 }
#define AST_VECTOR_REMOVE_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison.
Definition: vector.h:488
#define ROUTE_TABLE_ELEM_CMP(elem, value)
route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
static char * table
Definition: cdr_odbc.c:58
#define ROUTE_TABLE_ELEM_CLEANUP(elem)
route_table vector element cleanup.

◆ router_dispatch()

static void router_dispatch ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Definition at line 191 of file stasis_message_router.c.

References ao2_cleanup, stasis_message_route::callback, stasis_message_route::data, find_route(), router, and stasis_subscription_final_message().

Referenced by stasis_message_router_create_internal().

194 {
195  struct stasis_message_router *router = data;
196  struct stasis_message_route route;
197 
198  if (find_route(router, message, &route) == 0) {
199  route.callback(route.data, sub, message);
200  }
201 
202  if (stasis_subscription_final_message(sub, message)) {
203  ao2_cleanup(router);
204  }
205 }
static int find_route(struct stasis_message_router *router, struct stasis_message *message, struct stasis_message_route *route_out)
static struct stasis_message_router * router
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
stasis_subscription_cb callback

◆ router_dtor()

static void router_dtor ( void *  obj)
static

Definition at line 142 of file stasis_message_router.c.

References ast_assert, stasis_message_router::cache_routes, NULL, route_table_dtor(), router, stasis_message_router::routes, stasis_subscription_is_done(), stasis_subscription_is_subscribed(), and stasis_message_router::subscription.

Referenced by stasis_message_router_create_internal().

143 {
144  struct stasis_message_router *router = obj;
145 
148 
149  router->subscription = NULL;
150 
151  route_table_dtor(&router->routes);
152  route_table_dtor(&router->cache_routes);
153 }
static struct stasis_message_router * router
struct route_table cache_routes
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
static void route_table_dtor(struct route_table *table)
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
struct stasis_subscription * subscription

◆ stasis_message_router_accept_formatters()

void stasis_message_router_accept_formatters ( struct stasis_message_router router,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a message router that we are interested in messages with one or more formatters.

The formatters are passed on to the underlying subscription.

Warning
With direct subscriptions, adding a formatter filter is an OR operation with any message type filters. In the current implementation of message router however, it's an AND operation. Even when setting a default route, the callback will only get messages that have the formatters provides in this call.
Parameters
routerRouter to set the formatters of.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 420 of file stasis_message_router.c.

References ast_assert, NULL, stasis_subscription_accept_formatters(), and stasis_message_router::subscription.

422 {
423  ast_assert(router != NULL);
424 
426 
427  return;
428 }
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
struct stasis_subscription * subscription

◆ stasis_message_router_add()

int stasis_message_router_add ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route to a message router.

A particular message_type may have at most one route per router. If you route stasis_cache_update messages, the callback will only receive updates for types not handled by routes added with stasis_message_router_add_cache_update().

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeType of message to route.
callbackCallback to forard messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 310 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, NULL, route_table_add(), stasis_message_router::routes, stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), AST_TEST_DEFINE(), create_routes(), endpoint_internal_create(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_confbridge_init(), manager_endpoints_init(), manager_mwi_init(), manager_subscriptions_init(), meetme_stasis_init(), pjsip_outbound_registration_metrics_init(), and setup_stasis_subs().

313 {
314  int res;
315 
316  ast_assert(router != NULL);
317 
318  if (!message_type) {
319  /* Cannot route to NULL type. */
320  return -1;
321  }
322  ao2_lock(router);
323  res = route_table_add(&router->routes, message_type, callback, data);
324  if (!res) {
326  /* Until a specific message type was added we would already drop the message, so being
327  * selective now doesn't harm us. If we have a default route then we are already forced
328  * to filter nothing and messages will come in regardless.
329  */
331  }
332  ao2_unlock(router);
333  return res;
334 }
static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_subscription * subscription

◆ stasis_message_router_add_cache_update()

int stasis_message_router_add_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route for stasis_cache_update messages to a message router.

A particular message_type may have at most one cache route per router. These are distinct from regular routes, so one could have both a regular route and a cache route for the same message_type.

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeSubtype of cache update to route.
callbackCallback to forard messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 336 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, stasis_message_router::cache_routes, NULL, route_table_add(), stasis_cache_update_type(), stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), and AST_TEST_DEFINE().

339 {
340  int res;
341 
342  ast_assert(router != NULL);
343 
344  if (!message_type) {
345  /* Cannot cache a route to NULL type. */
346  return -1;
347  }
348  ao2_lock(router);
349  res = route_table_add(&router->cache_routes, message_type, callback, data);
350  if (!res) {
353  }
354  ao2_unlock(router);
355  return res;
356 }
static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
struct route_table cache_routes
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
#define ao2_lock(a)
Definition: astobj2.h:718
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_subscription * subscription

◆ stasis_message_router_create_internal()

static struct stasis_message_router* stasis_message_router_create_internal ( struct stasis_topic topic,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)
static

Definition at line 207 of file stasis_message_router.c.

References __stasis_subscribe(), __stasis_subscribe_pool(), ao2_ref, ao2_t_alloc, AST_VECTOR_INIT, stasis_message_router::cache_routes, NULL, router, router_dispatch(), router_dtor(), stasis_message_router::routes, stasis_subscription_accept_message_type(), stasis_subscription_change_type(), stasis_topic_name(), and stasis_message_router::subscription.

Referenced by __stasis_message_router_create(), and __stasis_message_router_create_pool().

210 {
211  int res;
213 
214  router = ao2_t_alloc(sizeof(*router), router_dtor, stasis_topic_name(topic));
215  if (!router) {
216  return NULL;
217  }
218 
219  res = 0;
220  res |= AST_VECTOR_INIT(&router->routes, 0);
221  res |= AST_VECTOR_INIT(&router->cache_routes, 0);
222  if (res) {
223  ao2_ref(router, -1);
224 
225  return NULL;
226  }
227 
228  if (use_thread_pool) {
229  router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func);
230  } else {
231  router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func);
232  }
233 
234  if (!router->subscription) {
235  ao2_ref(router, -1);
236 
237  return NULL;
238  }
239 
240  /* We need to receive subscription change messages so we know when our subscription goes away */
242 
243  return router;
244 }
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
static struct stasis_message_router * router
struct route_table cache_routes
#define NULL
Definition: resample.c:96
static void router_dtor(void *obj)
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static void router_dispatch(void *data, struct stasis_subscription *sub, struct stasis_message *message)
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:955
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:944
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_subscription * subscription
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.

◆ stasis_message_router_is_done()

int stasis_message_router_is_done ( struct stasis_message_router router)

Returns whether router has received its final message.

Parameters
routerRouter.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 278 of file stasis_message_router.c.

References stasis_subscription_is_done(), and stasis_message_router::subscription.

Referenced by endpoint_dtor().

279 {
280  if (!router) {
281  /* Null router is about as done as you can get */
282  return 1;
283  }
284 
286 }
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
struct stasis_subscription * subscription

◆ stasis_message_router_publish_sync()

void stasis_message_router_publish_sync ( struct stasis_message_router router,
struct stasis_message message 
)

Publish a message to a message router's subscription synchronously.

Parameters
routerRouter
messageThe Stasis Message Bus API message

This should be used when a message needs to be published synchronously to the underlying subscription created by a message router. This is analagous to stasis_publish_sync.

Note that the caller will be blocked until the thread servicing the message on the message router's subscription completes handling of the message.

Since
12.1.0

Definition at line 288 of file stasis_message_router.c.

References ao2_bump, ao2_cleanup, ast_assert, NULL, stasis_publish_sync(), and stasis_message_router::subscription.

Referenced by ast_cdr_engine_term(), cdr_prop_write(), cdr_read(), cdr_write(), forkcdr_exec(), and publish_app_cdr_message().

290 {
291  ast_assert(router != NULL);
292 
293  ao2_bump(router);
294  stasis_publish_sync(router->subscription, message);
295  ao2_cleanup(router);
296 }
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define ao2_bump(obj)
Definition: astobj2.h:491
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_subscription * subscription

◆ stasis_message_router_remove()

void stasis_message_router_remove ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 358 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, NULL, route_table_remove(), and stasis_message_router::routes.

Referenced by cleanup_module(), load_general_config(), and unload_module().

360 {
361  ast_assert(router != NULL);
362 
363  if (!message_type) {
364  /* Cannot remove a NULL type. */
365  return;
366  }
367  ao2_lock(router);
368  route_table_remove(&router->routes, message_type);
369  ao2_unlock(router);
370 }
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
static int route_table_remove(struct route_table *table, struct stasis_message_type *message_type)
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_message_router_remove_cache_update()

void stasis_message_router_remove_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a cache route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 372 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, stasis_message_router::cache_routes, NULL, and route_table_remove().

375 {
376  ast_assert(router != NULL);
377 
378  if (!message_type) {
379  /* Cannot remove a NULL type. */
380  return;
381  }
382  ao2_lock(router);
383  route_table_remove(&router->cache_routes, message_type);
384  ao2_unlock(router);
385 }
struct route_table cache_routes
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
static int route_table_remove(struct route_table *table, struct stasis_message_type *message_type)
#define ao2_lock(a)
Definition: astobj2.h:718

◆ stasis_message_router_set_congestion_limits()

int stasis_message_router_set_congestion_limits ( struct stasis_message_router router,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis message router.

Since
13.10.0
Parameters
routerPointer to a stasis message router
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 298 of file stasis_message_router.c.

References stasis_subscription_set_congestion_limits(), and stasis_message_router::subscription.

Referenced by create_routes(), load_module(), and manager_subscriptions_init().

300 {
301  int res = -1;
302 
303  if (router) {
305  low_water, high_water);
306  }
307  return res;
308 }
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1013
struct stasis_subscription * subscription

◆ stasis_message_router_set_default()

int stasis_message_router_set_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data 
)

Sets the default route of a router.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12
Note
Setting a default callback will automatically cause the underlying subscription to receive all messages and not be filtered. If filtering is desired then a specific route for each message type should be provided.

Definition at line 387 of file stasis_message_router.c.

References stasis_message_router_set_formatters_default(), and STASIS_SUBSCRIPTION_FORMATTER_NONE.

Referenced by AST_TEST_DEFINE(), load_module(), and setup_stasis_subs().

390 {
392 
393  /* While this implementation can never fail, it used to be able to */
394  return 0;
395 }
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.

◆ stasis_message_router_set_formatters_default()

void stasis_message_router_set_formatters_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data,
enum stasis_subscription_message_formatters  formatters 
)

Sets the default route of a router with formatters.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.26.0
16.3.0
Note
If formatters are specified then the message router will remain in a selective filtering state. Any explicit routes will receive messages of their message type and the default callback will only receive messages that have one of the given formatters. Explicit routes will not be filtered according to the given formatters.

Definition at line 397 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, ast_assert, stasis_message_route::callback, stasis_message_route::data, stasis_message_router::default_route, NULL, stasis_subscription_accept_formatters(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, STASIS_SUBSCRIPTION_FORMATTER_NONE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), manager_subscriptions_init(), and stasis_message_router_set_default().

401 {
402  ast_assert(router != NULL);
403  ast_assert(callback != NULL);
404 
406 
407  ao2_lock(router);
408  router->default_route.callback = callback;
409  router->default_route.data = data;
410  ao2_unlock(router);
411 
412  if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
413  /* Formatters govern what messages the default callback get, so it is only if none is
414  * specified that we accept all messages regardless.
415  */
417  }
418 }
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_lock(a)
Definition: astobj2.h:718
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
struct stasis_message_route default_route
stasis_subscription_cb callback
struct stasis_subscription * subscription

◆ stasis_message_router_unsubscribe()

void stasis_message_router_unsubscribe ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic.

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 258 of file stasis_message_router.c.

References ao2_lock, ao2_unlock, stasis_unsubscribe(), and stasis_message_router::subscription.

Referenced by app_shutdown(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), manager_confbridge_shutdown(), meetme_stasis_cleanup(), remove_stasis_subscriptions(), and setup_stasis_subs().

259 {
260  if (!router) {
261  return;
262  }
263 
264  ao2_lock(router);
265  router->subscription = stasis_unsubscribe(router->subscription);
266  ao2_unlock(router);
267 }
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ao2_lock(a)
Definition: astobj2.h:718
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
struct stasis_subscription * subscription

◆ stasis_message_router_unsubscribe_and_join()

void stasis_message_router_unsubscribe_and_join ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic, blocking until the final message has been processed.

See stasis_unsubscribe_and_join() for info on when to use this vs. stasis_message_router_unsubscribe().

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 269 of file stasis_message_router.c.

References stasis_unsubscribe_and_join(), and stasis_message_router::subscription.

Referenced by AST_TEST_DEFINE(), cdr_engine_shutdown(), cleanup_module(), destroy_routes(), manager_endpoints_shutdown(), manager_shutdown(), pjsip_outbound_registration_metrics_unload_cb(), and unload_module().

271 {
272  if (!router) {
273  return;
274  }
276 }
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
struct stasis_subscription * subscription