Asterisk - The Open Source Telephony Project  18.5.0
stasis_message_router.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis message router implementation.
22  *
23  * \author David M. Lee, II <[email protected]>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
34 #include "asterisk/vector.h"
35 
36 /*! \internal */
38  /*! Message type handle by this route. */
40  /*! Callback function for incoming message processing. */
42  /*! Data pointer to be handed to the callback. */
43  void *data;
44 };
45 
46 AST_VECTOR(route_table, struct stasis_message_route);
47 
48 static struct stasis_message_route *route_table_find(struct route_table *table,
50 {
51  size_t idx;
52  struct stasis_message_route *route;
53 
54  /* While a linear search for routes may seem very inefficient, most
55  * route tables have six routes or less. For such small data, it's
56  * hard to beat a linear search. If we start having larger route
57  * tables, then we can look into containers with more efficient
58  * lookups.
59  */
60  for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
61  route = AST_VECTOR_GET_ADDR(table, idx);
62  if (route->message_type == message_type) {
63  return route;
64  }
65  }
66 
67  return NULL;
68 }
69 
70 /*!
71  * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
72  *
73  * \param elem Element to compare against
74  * \param value Value to compare with the vector element.
75  *
76  * \return 0 if element does not match.
77  * \return Non-zero if element matches.
78  */
79 #define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
80 
81 /*!
82  * \brief route_table vector element cleanup.
83  *
84  * \param elem Element to cleanup
85  *
86  * \return Nothing
87  */
88 #define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
89 
90 static int route_table_remove(struct route_table *table,
92 {
93  return AST_VECTOR_REMOVE_CMP_UNORDERED(table, message_type, ROUTE_TABLE_ELEM_CMP,
95 }
96 
97 static int route_table_add(struct route_table *table,
100 {
101  struct stasis_message_route route;
102  int res;
103 
104  ast_assert(callback != NULL);
105  ast_assert(route_table_find(table, message_type) == NULL);
106 
107  route.message_type = ao2_bump(message_type);
108  route.callback = callback;
109  route.data = data;
110 
111  res = AST_VECTOR_APPEND(table, route);
112  if (res) {
114  }
115  return res;
116 }
117 
118 static void route_table_dtor(struct route_table *table)
119 {
120  size_t idx;
121  struct stasis_message_route *route;
122 
123  for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
124  route = AST_VECTOR_GET_ADDR(table, idx);
125  ROUTE_TABLE_ELEM_CLEANUP(*route);
126  }
127  AST_VECTOR_FREE(table);
128 }
129 
130 /*! \internal */
132  /*! Subscription to the upstream topic */
134  /*! Subscribed routes */
135  struct route_table routes;
136  /*! Subscribed routes for \ref stasis_cache_update messages */
137  struct route_table cache_routes;
138  /*! Route of last resort */
140 };
141 
142 static void router_dtor(void *obj)
143 {
144  struct stasis_message_router *router = obj;
145 
148 
149  router->subscription = NULL;
150 
151  route_table_dtor(&router->routes);
152  route_table_dtor(&router->cache_routes);
153 }
154 
155 static int find_route(
157  struct stasis_message *message,
158  struct stasis_message_route *route_out)
159 {
160  struct stasis_message_route *route = NULL;
161  struct stasis_message_type *type = stasis_message_type(message);
162  SCOPED_AO2LOCK(lock, router);
163 
164  ast_assert(route_out != NULL);
165 
166  if (type == stasis_cache_update_type()) {
167  /* Find a cache route */
168  struct stasis_cache_update *update =
169  stasis_message_data(message);
170  route = route_table_find(&router->cache_routes, update->type);
171  }
172 
173  if (route == NULL) {
174  /* Find a regular route */
175  route = route_table_find(&router->routes, type);
176  }
177 
178  if (route == NULL && router->default_route.callback) {
179  /* Maybe the default route, then? */
180  route = &router->default_route;
181  }
182 
183  if (!route) {
184  return -1;
185  }
186 
187  *route_out = *route;
188  return 0;
189 }
190 
191 static void router_dispatch(void *data,
192  struct stasis_subscription *sub,
193  struct stasis_message *message)
194 {
196  struct stasis_message_route route;
197 
198  if (find_route(router, message, &route) == 0) {
199  route.callback(route.data, sub, message);
200  }
201 
202  if (stasis_subscription_final_message(sub, message)) {
203  ao2_cleanup(router);
204  }
205 }
206 
208  struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno,
209  const char *func)
210 {
211  int res;
213 
214  router = ao2_t_alloc(sizeof(*router), router_dtor, stasis_topic_name(topic));
215  if (!router) {
216  return NULL;
217  }
218 
219  res = 0;
220  res |= AST_VECTOR_INIT(&router->routes, 0);
221  res |= AST_VECTOR_INIT(&router->cache_routes, 0);
222  if (res) {
223  ao2_ref(router, -1);
224 
225  return NULL;
226  }
227 
228  if (use_thread_pool) {
229  router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func);
230  } else {
231  router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func);
232  }
233 
234  if (!router->subscription) {
235  ao2_ref(router, -1);
236 
237  return NULL;
238  }
239 
240  /* We need to receive subscription change messages so we know when our subscription goes away */
242 
243  return router;
244 }
245 
247  struct stasis_topic *topic, const char *file, int lineno, const char *func)
248 {
249  return stasis_message_router_create_internal(topic, 0, file, lineno, func);
250 }
251 
253  struct stasis_topic *topic, const char *file, int lineno, const char *func)
254 {
255  return stasis_message_router_create_internal(topic, 1, file, lineno, func);
256 }
257 
259 {
260  if (!router) {
261  return;
262  }
263 
264  ao2_lock(router);
265  router->subscription = stasis_unsubscribe(router->subscription);
266  ao2_unlock(router);
267 }
268 
271 {
272  if (!router) {
273  return;
274  }
276 }
277 
279 {
280  if (!router) {
281  /* Null router is about as done as you can get */
282  return 1;
283  }
284 
286 }
287 
289  struct stasis_message *message)
290 {
291  ast_assert(router != NULL);
292 
293  ao2_bump(router);
294  stasis_publish_sync(router->subscription, message);
295  ao2_cleanup(router);
296 }
297 
299  long low_water, long high_water)
300 {
301  int res = -1;
302 
303  if (router) {
305  low_water, high_water);
306  }
307  return res;
308 }
309 
313 {
314  int res;
315 
316  ast_assert(router != NULL);
317 
318  if (!message_type) {
319  /* Cannot route to NULL type. */
320  return -1;
321  }
322  ao2_lock(router);
323  res = route_table_add(&router->routes, message_type, callback, data);
324  if (!res) {
326  /* Until a specific message type was added we would already drop the message, so being
327  * selective now doesn't harm us. If we have a default route then we are already forced
328  * to filter nothing and messages will come in regardless.
329  */
331  }
332  ao2_unlock(router);
333  return res;
334 }
335 
339 {
340  int res;
341 
342  ast_assert(router != NULL);
343 
344  if (!message_type) {
345  /* Cannot cache a route to NULL type. */
346  return -1;
347  }
348  ao2_lock(router);
349  res = route_table_add(&router->cache_routes, message_type, callback, data);
350  if (!res) {
353  }
354  ao2_unlock(router);
355  return res;
356 }
357 
360 {
361  ast_assert(router != NULL);
362 
363  if (!message_type) {
364  /* Cannot remove a NULL type. */
365  return;
366  }
367  ao2_lock(router);
368  route_table_remove(&router->routes, message_type);
369  ao2_unlock(router);
370 }
371 
375 {
376  ast_assert(router != NULL);
377 
378  if (!message_type) {
379  /* Cannot remove a NULL type. */
380  return;
381  }
382  ao2_lock(router);
383  route_table_remove(&router->cache_routes, message_type);
384  ao2_unlock(router);
385 }
386 
389  void *data)
390 {
392 
393  /* While this implementation can never fail, it used to be able to */
394  return 0;
395 }
396 
399  void *data,
401 {
402  ast_assert(router != NULL);
403  ast_assert(callback != NULL);
404 
406 
407  ao2_lock(router);
408  router->default_route.callback = callback;
409  router->default_route.data = data;
410  ao2_unlock(router);
411 
412  if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
413  /* Formatters govern what messages the default callback get, so it is only if none is
414  * specified that we accept all messages regardless.
415  */
417  }
418 }
419 
422 {
423  ast_assert(router != NULL);
424 
426 
427  return;
428 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
static const char type[]
Definition: chan_ooh323.c:109
static int find_route(struct stasis_message_router *router, struct stasis_message *message, struct stasis_message_route *route_out)
static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Asterisk main include file. File version handling, generic pbx functions.
AST_VECTOR(route_table, struct stasis_message_route)
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
#define AST_VECTOR_REMOVE_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison.
Definition: vector.h:488
static struct stasis_message_router * router
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
struct route_table cache_routes
#define ROUTE_TABLE_ELEM_CMP(elem, value)
route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
void stasis_message_router_accept_formatters(struct stasis_message_router *router, enum stasis_subscription_message_formatters formatters)
Indicate to a message router that we are interested in messages with one or more formatters.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
#define NULL
Definition: resample.c:96
static int route_table_remove(struct route_table *table, struct stasis_message_type *message_type)
static void router_dtor(void *obj)
int stasis_message_router_is_done(struct stasis_message_router *router)
Returns whether router has received its final message.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed...
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1013
#define ao2_bump(obj)
Definition: astobj2.h:491
static char * table
Definition: cdr_odbc.c:58
static void default_route(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Router callback for any message that doesn&#39;t otherwise have a route.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
ast_mutex_t lock
Definition: app_meetme.c:1091
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
#define ao2_lock(a)
Definition: astobj2.h:718
Cache update message.
Definition: stasis.h:967
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
static void route_table_dtor(struct route_table *table)
static void router_dispatch(void *data, struct stasis_subscription *sub, struct stasis_message *message)
#define ROUTE_TABLE_ELEM_CLEANUP(elem)
route_table vector element cleanup.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message_route default_route
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:615
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
Vector container support.
static struct stasis_message_route * route_table_find(struct route_table *table, struct stasis_message_type *message_type)
struct stasis_message_router * __stasis_message_router_create_pool(struct stasis_topic *topic, const char *file, int lineno, const char *func)
Create a new message router object.
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:955
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
void stasis_message_router_remove_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a cache route from a message router.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
void stasis_message_router_publish_sync(struct stasis_message_router *router, struct stasis_message *message)
Publish a message to a message router&#39;s subscription synchronously.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:944
stasis_subscription_cb callback
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_subscription * subscription
int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
Sets the default route of a router.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * message_type
struct stasis_message_router * __stasis_message_router_create(struct stasis_topic *topic, const char *file, int lineno, const char *func)
Create a new message router object.
static struct stasis_message_router * stasis_message_router_create_internal(struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:311
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611