Asterisk - The Open Source Telephony Project  18.5.0
stasis_state.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2019, Sangoma Technologies Corporation
5  *
6  * Kevin Harwell <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*** MODULEINFO
20  <support_level>core</support_level>
21  ***/
22 
23 #include "asterisk.h"
24 
25 #include "asterisk/stasis_state.h"
26 
27 /*!
28  * \internal
29  * \brief Used to link a stasis_state to it's manager
30  */
32  AO2_WEAKPROXY();
33  /*! The manager that owns and handles this state */
35  /*! A unique id for this state object. */
36  char id[0];
37 };
38 
39 /*!
40  * \internal
41  * \brief Associates a stasis topic to its last known published message
42  *
43  * This object's lifetime is tracked by the number of publishers and subscribers to it.
44  * Once all publishers and subscribers have been removed this object is removed from the
45  * manager's collection and destroyed. While a single object type (namely this one) could
46  * be utilized for both publishers, and subscribers this implementation purposely keeps
47  * them separated. This was done to maintain readability, make debugging easier, and allow
48  * for better logging and future enhancements.
49  */
50 struct stasis_state {
51  /*! The number of state subscribers */
52  unsigned int num_subscribers;
53  /*!
54  * \brief The manager that owns and handles this state
55  * \note This reference is owned by stasis_state_proxy
56  */
58  /*! Forwarding information, i.e. this topic to manager's topic */
60  /*! The managed topic */
62  /*! The actual state data */
64  /*!
65  * A container of eids. It's assumed that there is only a single publisher per
66  * eid per topic. Thus the publisher is tracked by the system's eid.
67  */
68  AST_VECTOR(, struct ast_eid) eids;
69  /*! A unique id for this state object. */
70  char *id;
71 };
72 
75 
76 /*! The number of buckets to use for managed states */
77 #define STATE_BUCKETS 57
78 
80  /*! Holds all state objects handled by this manager */
82  /*! The manager's topic. All state topics are forward to this one */
84  /*! A collection of manager event handlers */
86 };
87 
88 /*!
89  * \internal
90  * \brief Retrieve a state's topic name without the manager topic.
91  *
92  * State topics have names that consist of the manager's topic name
93  * combined with a unique id separated by a slash. For instance:
94  *
95  * manager topic's name/unique id
96  *
97  * This method retrieves the unique id part from the state's topic name.
98  *
99  * \param manager_topic The manager's topic
100  * \param state_topic A state topic
101  *
102  * \return The state's topic unique id
103  */
104 static const char *state_id_by_topic(struct stasis_topic *manager_topic,
105  const struct stasis_topic *state_topic)
106 {
107  const char *id;
108 
109  /* This topic should always belong to the manager */
111  stasis_topic_name(state_topic)));
112 
113  id = strchr(stasis_topic_name(state_topic), '/');
114 
115  /* The state's unique id should always exist */
116  ast_assert(id != NULL && (id + 1) != NULL);
117 
118  return (id + 1);
119 }
120 
121 static void state_dtor(void *obj)
122 {
123  struct stasis_state *state = obj;
124 
125  state->forward = stasis_forward_cancel(state->forward);
126  ao2_cleanup(state->topic);
127  state->topic = NULL;
128  ao2_cleanup(state->msg);
129  state->msg = NULL;
130 
131  /* All eids should have been removed */
132  ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
133  AST_VECTOR_FREE(&state->eids);
134 }
135 
136 static void state_proxy_dtor(void *obj) {
137  struct stasis_state_proxy *proxy = obj;
138 
139  ao2_cleanup(proxy->manager);
140 }
141 
142 static void state_proxy_sub_cb(void *obj, void *data)
143 {
144  struct stasis_state_proxy *proxy = obj;
145 
146  ao2_unlink(proxy->manager->states, proxy);
147 }
148 
149 /*!
150  * \internal
151  * \brief Allocate a stasis state object and add it to the manager.
152  *
153  * Create and initialize a state structure. It's required that either a state
154  * topic, or an id is specified. If a state topic is not given then one will be
155  * created using the given id.
156  *
157  * \param manager The owning manager
158  * \param state_topic A state topic to be managed
159  * \param id The unique id for the state
160  *
161  * \return A stasis_state object or NULL
162  * \return NULL on error
163  *
164  * \pre manager->states must be locked.
165  * \pre manager->states does not contain an object matching key \a id.
166  */
168  struct stasis_topic *state_topic, const char *id,
169  const char *file, int line, const char *func)
170 {
171  struct stasis_state_proxy *proxy = NULL;
172  struct stasis_state *state = NULL;
173 
174  if (!id) {
175  /* If not given an id, then a state topic is required */
176  ast_assert(state_topic != NULL);
177 
178  /* Get the id we'll key off of from the state topic */
179  id = state_id_by_topic(manager->all_topic, state_topic);
180  }
181 
182  state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
183  if (!state) {
184  goto error_return;
185  }
186 
187  if (!state_topic) {
188  char *name;
189 
190  /*
191  * To provide further detail and to ensure that the topic is unique within the
192  * scope of the system we prefix it with the manager's topic name, which should
193  * itself already be unique.
194  */
195  if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
196  goto error_return;
197  }
198 
199  state->topic = stasis_topic_create(name);
200 
201  ast_free(name);
202  if (!state->topic) {
203  goto error_return;
204  }
205  } else {
206  /*
207  * Since the state topic was passed in, go ahead and bump its reference.
208  * By doing this here first, it allows us to consistently decrease the reference on
209  * state allocation error.
210  */
211  ao2_ref(state_topic, +1);
212  state->topic = state_topic;
213  }
214 
215  proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
216  if (!proxy) {
217  goto error_return;
218  }
219 
220  strcpy(proxy->id, id); /* Safe */
221 
222  state->id = proxy->id;
223  proxy->manager = ao2_bump(manager);
224  state->manager = proxy->manager; /* state->manager is owned by the proxy */
225 
226  state->forward = stasis_forward_all(state->topic, manager->all_topic);
227  if (!state->forward) {
228  goto error_return;
229  }
230 
231  if (AST_VECTOR_INIT(&state->eids, 2)) {
232  goto error_return;
233  }
234 
235  if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
236  goto error_return;
237  }
238 
240  goto error_return;
241  }
242 
243  if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
244  goto error_return;
245  }
246 
247  ao2_ref(proxy, -1);
248 
249  return state;
250 
251 error_return:
252  ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
253  id, stasis_topic_name(manager->all_topic));
254  ao2_cleanup(state);
255  ao2_cleanup(proxy);
256  return NULL;
257 }
258 
259 /*!
260  * \internal
261  * \brief Find a state by id, or create one if not found and add it to the manager.
262  *
263  * \param manager The manager to be added to
264  * \param state_topic A state topic to be managed (if NULL id is required)
265  * \param id The unique id for the state (if NULL state_topic is required)
266  *
267  * \return The added state object
268  * \return NULL on error
269  */
270 #define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
272  struct stasis_topic *state_topic, const char *id,
273  const char *file, int line, const char *func)
274 {
275  struct stasis_state *state;
276 
277  ao2_lock(manager->states);
278  if (ast_strlen_zero(id)) {
279  id = state_id_by_topic(manager->all_topic, state_topic);
280  }
281 
282  state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
283  if (!state) {
284  state = state_alloc(manager, state_topic, id, file, line, func);
285  }
286 
287  ao2_unlock(manager->states);
288 
289  return state;
290 }
291 
292 static void state_manager_dtor(void *obj)
293 {
294  struct stasis_state_manager *manager = obj;
295 
296 #ifdef AO2_DEBUG
297  {
298  char *container_name =
299  ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
300  sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
301  ao2_container_unregister(container_name);
302  }
303 #endif
304 
305  ao2_cleanup(manager->states);
306  manager->states = NULL;
307  ao2_cleanup(manager->all_topic);
308  manager->all_topic = NULL;
309  AST_VECTOR_RW_FREE(&manager->observers);
310 }
311 
312 #ifdef AO2_DEBUG
313 static void state_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
314 {
315  struct stasis_state *state = v_obj;
316 
317  if (!state) {
318  return;
319  }
320  prnt(where, "%s", stasis_topic_name(state->topic));
321 }
322 #endif
323 
324 struct stasis_state_manager *stasis_state_manager_create(const char *topic_name)
325 {
327 
328  manager = ao2_alloc_options(sizeof(*manager), state_manager_dtor,
330  if (!manager) {
331  return NULL;
332  }
333 
335  STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
336  if (!manager->states) {
337  ao2_ref(manager, -1);
338  return NULL;
339  }
340 
341  manager->all_topic = stasis_topic_create(topic_name);
342  if (!manager->all_topic) {
343  ao2_ref(manager, -1);
344  return NULL;
345  }
346 
347  if (AST_VECTOR_RW_INIT(&manager->observers, 2) != 0) {
348  ao2_ref(manager, -1);
349  return NULL;
350  }
351 
352 #ifdef AO2_DEBUG
353  {
354  char *container_name =
355  ast_alloca(strlen(stasis_topic_name(manager->all_topic)) + strlen("-manager") + 1);
356  sprintf(container_name, "%s-manager", stasis_topic_name(manager->all_topic));
357  ao2_container_register(container_name, manager->states, state_prnt_obj);
358  }
359 #endif
360 
361  return manager;
362 }
363 
365 {
366  return manager->all_topic;
367 }
368 
370 {
371  struct stasis_topic *topic;
372  struct stasis_state *state;
373 
374  state = state_find_or_add(manager, NULL, id);
375  if (!state) {
376  return NULL;
377  }
378 
379  topic = state->topic;
380  ao2_ref(state, -1);
381  return topic;
382 }
383 
385  /*! The stasis state subscribed to */
387  /*! The stasis subscription. */
389 };
390 
391 static void subscriber_dtor(void *obj)
392 {
393  size_t i;
394  struct stasis_state_subscriber *sub = obj;
395  struct stasis_state_manager *manager = sub->state->manager;
396 
397  AST_VECTOR_RW_RDLOCK(&manager->observers);
398  for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
399  if (AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe) {
400  AST_VECTOR_GET(&manager->observers, i)->on_unsubscribe(sub->state->id, sub);
401  }
402  }
403  AST_VECTOR_RW_UNLOCK(&manager->observers);
404 
405  ao2_lock(sub->state);
406  --sub->state->num_subscribers;
407  ao2_unlock(sub->state);
408 
409  ao2_ref(sub->state, -1);
410 }
411 
413  struct stasis_state_manager *manager, const char *id)
414 {
415  size_t i;
418 
419  if (!sub) {
420  ast_log(LOG_ERROR, "Unable to create subscriber to %s/%s\n",
421  stasis_topic_name(manager->all_topic), id);
422  return NULL;
423  }
424 
425  sub->state = state_find_or_add(manager, NULL, id);
426  if (!sub->state) {
427  ao2_ref(sub, -1);
428  return NULL;
429  }
430 
431  ao2_lock(sub->state);
432  ++sub->state->num_subscribers;
433  ao2_unlock(sub->state);
434 
435  AST_VECTOR_RW_RDLOCK(&manager->observers);
436  for (i = 0; i < AST_VECTOR_SIZE(&manager->observers); ++i) {
437  if (AST_VECTOR_GET(&manager->observers, i)->on_subscribe) {
438  AST_VECTOR_GET(&manager->observers, i)->on_subscribe(id, sub);
439  }
440  }
441  AST_VECTOR_RW_UNLOCK(&manager->observers);
442 
443  return sub;
444 }
445 
447  const char *id, stasis_subscription_cb callback, void *data)
448 {
449  struct stasis_topic *topic;
451 
452  if (!sub) {
453  return NULL;
454  }
455 
456  topic = sub->state->topic;
457  ast_debug(3, "Creating stasis state subscription to id '%s'. Topic: '%s':%p %d\n",
458  id, stasis_topic_name(topic), topic, (int)ao2_ref(topic, 0));
459 
460  sub->stasis_sub = stasis_subscribe_pool(topic, callback, data);
461 
462  if (!sub->stasis_sub) {
463  ao2_ref(sub, -1);
464  return NULL;
465  }
466 
467  return sub;
468 }
469 
471 {
473  ao2_ref(sub, -1);
474  return NULL;
475 }
476 
478 {
479  if (sub) {
481  ao2_ref(sub, -1);
482  }
483 
484  return NULL;
485 }
486 
488 {
489  return sub->state->id;
490 }
491 
493 {
494  return sub->state->topic;
495 }
496 
498 {
499  void *res;
500 
501  /*
502  * The data's reference needs to be bumped before returning so it doesn't disappear
503  * for the caller. Lock state, so the underlying message data is not replaced while
504  * retrieving.
505  */
506  ao2_lock(sub->state);
507  res = ao2_bump(stasis_message_data(sub->state->msg));
508  ao2_unlock(sub->state);
509 
510  return res;
511 }
512 
515 {
516  return sub->stasis_sub;
517 }
518 
520  /*! The stasis state to publish to */
522 };
523 
524 static void publisher_dtor(void *obj)
525 {
526  struct stasis_state_publisher *pub = obj;
527 
528  ao2_ref(pub->state, -1);
529 }
530 
532  struct stasis_state_manager *manager, const char *id)
533 {
536 
537  if (!pub) {
538  ast_log(LOG_ERROR, "Unable to create publisher to %s/%s\n",
539  stasis_topic_name(manager->all_topic), id);
540  return NULL;
541  }
542 
543  pub->state = state_find_or_add(manager, NULL, id);
544  if (!pub->state) {
545  ao2_ref(pub, -1);
546  return NULL;
547  }
548 
549  return pub;
550 }
551 
552 const char *stasis_state_publisher_id(const struct stasis_state_publisher *pub)
553 {
554  return pub->state->id;
555 }
556 
558 {
559  return pub->state->topic;
560 }
561 
563 {
564  ao2_lock(pub->state);
565  ao2_replace(pub->state->msg, msg);
566  ao2_unlock(pub->state);
567 
568  stasis_publish(pub->state->topic, msg);
569 }
570 
571 /*!
572  * \internal
573  * \brief Find, or add the given eid to the state object
574  *
575  * Publishers can be tracked implicitly using eids. This allows us to add, and subsequently
576  * remove state objects from the managed states container in a deterministic way. Using the
577  * eids in this way is possible because it's guaranteed that there will only ever be a single
578  * publisher for a uniquely named topic (topics tracked by this module) on a system.
579  *
580  * \note The vector does not use locking. Instead we use the state object for that, so it
581  * needs to be locked prior to calling this method.
582  *
583  * \param state The state object
584  * \param eid The system id to add to the state object
585  */
586 static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)
587 {
588  size_t i;
589 
590  if (!eid) {
591  eid = &ast_eid_default;
592  }
593 
594  for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
595  if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
596  break;
597  }
598  }
599 
600  if (i == AST_VECTOR_SIZE(&state->eids)) {
601  if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
602  /* This ensures state cannot be freed if it has any eids */
603  ao2_ref(state, +1);
604  }
605  }
606 }
607 
608 /*!
609  * \internal
610  * \brief Find, and remove the given eid from the state object
611  *
612  * Used to remove an eid from an implicit publisher.
613  *
614  * \note The vector does not use locking. Instead we use the state object for that, so it
615  * needs to be locked prior to calling this method.
616  *
617  * \param state The state object
618  * \param eid The system id to remove from the state object
619  */
620 static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
621 {
622  size_t i;
623 
624  if (!eid) {
625  eid = &ast_eid_default;
626  }
627 
628  for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
629  if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
630  AST_VECTOR_REMOVE_UNORDERED(&state->eids, i);
631  /* Balance the reference from state_find_or_add_eid */
632  ao2_ref(state, -1);
633  return;
634  }
635  }
636 }
637 
639  const struct ast_eid *eid, struct stasis_message *msg)
640 {
641  struct stasis_state *state;
642 
643  state = state_find_or_add(manager, NULL, id);
644  if (!state) {
645  return;
646  }
647 
648  ao2_lock(state);
649  state_find_or_add_eid(state, eid);
650  ao2_replace(state->msg, msg);
651  ao2_unlock(state);
652 
653  stasis_publish(state->topic, msg);
654 
655  ao2_ref(state, -1);
656 }
657 
659  const char *id, const struct ast_eid *eid, struct stasis_message *msg)
660 {
661  struct stasis_state *state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY, "");
662 
663  if (!state) {
664  /*
665  * In most circumstances state should already exist here. However, if there is no
666  * state then it can mean one of a few things:
667  *
668  * 1. This function was called prior to an implicit publish for the same given
669  * manager, and id.
670  * 2. This function was called more than once for the same manager, and id.
671  * 3. There is ref count problem with the explicit subscribers, and publishers.
672  */
673  ast_debug(5, "Attempted to remove state for id '%s', but state not found\n", id);
674  return;
675  }
676 
677  if (msg) {
678  stasis_publish(state->topic, msg);
679  }
680 
681  ao2_lock(state);
682  state_find_and_remove_eid(state, eid);
683  ao2_unlock(state);
684 
685  ao2_ref(state, -1);
686 }
687 
690 {
691  int res;
692 
693  AST_VECTOR_RW_WRLOCK(&manager->observers);
694  res = AST_VECTOR_APPEND(&manager->observers, observer);
695  AST_VECTOR_RW_UNLOCK(&manager->observers);
696 
697  return res;
698 }
699 
702 {
703  AST_VECTOR_RW_WRLOCK(&manager->observers);
704  AST_VECTOR_REMOVE_ELEM_UNORDERED(&manager->observers, observer, AST_VECTOR_ELEM_CLEANUP_NOOP);
705  AST_VECTOR_RW_UNLOCK(&manager->observers);
706 }
707 
709 {
710  struct stasis_message *msg;
711  int res;
712 
713  /*
714  * State needs to be locked here while we retrieve and bump the reference on its message
715  * object. Doing so guarantees the message object will live throughout its handling.
716  */
717  ao2_lock(state);
718  msg = ao2_bump(state->msg);
719  ao2_unlock(state);
720 
721  res = handler(state->id, msg, data);
722  ao2_cleanup(msg);
723  return res;
724 }
725 
726 static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
727 {
728  struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
729 
730  if (state) {
731  int res;
732  res = handle_stasis_state(state, arg, data);
733  ao2_ref(state, -1);
734  return res;
735  }
736 
737  return 0;
738 }
739 
741  void *data)
742 {
743  ast_assert(handler != NULL);
744 
746  handle_stasis_state_proxy, handler, data);
747 }
748 
749 static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
750 {
751  struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
752  int res = 0;
753 
754  if (state && state->num_subscribers) {
755  res = handle_stasis_state(state, arg, data);
756  }
757 
758  ao2_cleanup(state);
759 
760  return res;
761 }
762 
764  void *data)
765 {
766  ast_assert(handler != NULL);
767 
769  handle_stasis_state_subscribed, handler, data);
770 }
Managed stasis state event interface.
Definition: stasis_state.h:463
struct stasis_forward * forward
Definition: stasis_state.c:59
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
struct stasis_state * state
Definition: stasis_state.c:386
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158
enum sip_cc_notify_state state
Definition: chan_sip.c:959
static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
Definition: stasis_state.c:708
#define state_find_or_add(mgr, top, id)
Definition: stasis_state.c:270
static void state_dtor(void *obj)
Definition: stasis_state.c:121
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:585
struct stasis_topic * topic
Definition: stasis_state.c:61
static struct stasis_state * __state_find_or_add(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
Definition: stasis_state.c:271
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
Definition: vector.h:93
struct ao2_container * observers
Registered global observers.
Definition: sorcery.c:281
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
Definition: stasis_state.c:658
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
Definition: stasis_state.c:700
static void state_proxy_dtor(void *obj)
Definition: stasis_state.c:136
struct stasis_topic * stasis_state_subscriber_topic(struct stasis_state_subscriber *sub)
Retrieve the subscriber&#39;s topic.
Definition: stasis_state.c:492
struct stasis_state_manager * manager
The manager that owns and handles this state.
Definition: stasis_state.c:57
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
Definition: stasis_state.c:324
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
Definition: stasis_state.c:446
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
void * __ao2_alloc(size_t data_size, ao2_destructor_fn destructor_fn, unsigned int options, const char *tag, const char *file, int line, const char *func) attribute_warn_unused_result
Definition: astobj2.c:765
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
Definition: stasis_state.c:740
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:880
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
Definition: stasis_state.h:521
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Definition: stasis_state.c:497
struct stasis_message * msg
Definition: stasis_state.c:63
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:695
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:931
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
Definition: stasis_state.c:688
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
static struct stasis_topic * manager_topic
A stasis_topic that all topics AMI cares about will be forwarded to.
Definition: manager.c:1490
static void state_proxy_sub_cb(void *obj, void *data)
Definition: stasis_state.c:142
struct stasis_state_subscriber * stasis_state_add_subscriber(struct stasis_state_manager *manager, const char *id)
Add a subscriber to the managed stasis state for the given id.
Definition: stasis_state.c:412
const char * stasis_state_subscriber_id(const struct stasis_state_subscriber *sub)
Retrieve the underlying subscribed to state&#39;s unique id.
Definition: stasis_state.c:487
void * stasis_state_unsubscribe(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic and stasis state.
Definition: stasis_state.c:470
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
static const char * state_id_by_topic(struct stasis_topic *manager_topic, const struct stasis_topic *state_topic)
Definition: stasis_state.c:104
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1442
#define ast_strlen_zero(foo)
Definition: strings.h:52
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
Definition: stasis_state.c:562
struct stasis_topic * stasis_state_topic(struct stasis_state_manager *manager, const char *id)
Retrieve a managed topic creating one if not currently managed.
Definition: stasis_state.c:369
struct stasis_topic * all_topic
Definition: stasis_state.c:83
#define ao2_bump(obj)
Definition: astobj2.h:491
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher&#39;s underlying state&#39;s unique id.
Definition: stasis_state.c:552
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ast_log
Definition: astobj2.c:42
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:670
void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed, and explicitly subscribed state call the given handler.
Definition: stasis_state.c:763
static struct stasis_state * state_alloc(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
Definition: stasis_state.c:167
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:573
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
#define ao2_weakproxy_get_object(weakproxy, flags)
Definition: astobj2.h:625
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_lock(a)
Definition: astobj2.h:718
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
Definition: stasis_state.c:638
struct stasis_topic * stasis_state_all_topic(struct stasis_state_manager *manager)
Retrieve the manager&#39;s topic (the topic that all state topics get forwarded to)
Definition: stasis_state.c:364
struct stasis_state_manager * manager
Definition: stasis_state.c:34
static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
Definition: stasis_state.c:749
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
Stasis State API.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
static void publisher_dtor(void *obj)
Definition: stasis_state.c:524
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
static void subscriber_dtor(void *obj)
Definition: stasis_state.c:391
struct stasis_subscription * stasis_state_subscriber_subscription(struct stasis_state_subscriber *sub)
Retrieve the stasis topic subscription if available.
Definition: stasis_state.c:513
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
Definition: astobj2.h:586
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1743
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
#define stasis_subscribe_pool(topic, callback, data)
Definition: stasis.h:682
struct stasis_subscription * stasis_sub
Definition: stasis_state.c:388
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:615
struct stasis_topic * stasis_state_publisher_topic(struct stasis_state_publisher *pub)
Retrieve the publisher&#39;s topic.
Definition: stasis_state.c:557
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
Definition: astobj2.h:1768
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id)
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:890
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
#define STATE_BUCKETS
Definition: stasis_state.c:77
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
Definition: stasis_state.c:620
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
static void state_manager_dtor(void *obj)
Definition: stasis_state.c:292
#define ao2_replace(dst, src)
Definition: astobj2.h:517
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
Definition: stasis_state.c:726
struct stasis_state * state
Definition: stasis_state.c:521
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition: test_ari.c:59
struct ast_sorcery_instance_observer observer
struct stasis_forward * sub
Definition: res_corosync.c:240
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Definition: strings.h:94
unsigned int num_subscribers
Definition: stasis_state.c:52
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
Definition: astobj2.h:557
Forwarding information.
Definition: stasis.c:1531
struct ao2_container * states
Definition: stasis_state.c:81
Generic container type.
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
Definition: stasis_state.c:531
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
Definition: stasis_state.c:477
AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id)
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)
Definition: stasis_state.c:586