Asterisk - The Open Source Telephony Project  18.5.0
stasis.h
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 #ifndef _ASTERISK_STASIS_H
20 #define _ASTERISK_STASIS_H
21 
22 /*! \file
23  *
24  * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
25  * detailed documentation.
26  *
27  * \author David M. Lee, II <[email protected]>
28  * \since 12
29  *
30  * \page stasis Stasis Message Bus API
31  *
32  * \par Intro
33  *
34  * The Stasis Message Bus is a loosely typed mechanism for distributing messages
35  * within Asterisk. It is designed to be:
36  * - Loosely coupled; new message types can be added in seperate modules.
37  * - Easy to use; publishing and subscribing are straightforward operations.
38  *
39  * There are three main concepts for using the Stasis Message Bus:
40  * - \ref stasis_message
41  * - \ref stasis_topic
42  * - \ref stasis_subscription
43  *
44  * \par stasis_message
45  *
46  * Central to the Stasis Message Bus is the \ref stasis_message, the messages
47  * that are sent on the bus. These messages have:
48  * - a type (as defined by a \ref stasis_message_type)
49  * - a value - a \c void pointer to an AO2 object
50  * - a timestamp when it was created
51  *
52  * Once a \ref stasis_message has been created, it is immutable and cannot
53  * change. The same goes for the value of the message (although this cannot be
54  * enforced in code). Messages themselves are reference-counted, AO2 objects,
55  * along with their values. By being both reference counted and immutable,
56  * messages can be shared throughout the system without any concerns for
57  * threading.
58  *
59  * The type of a message is defined by an instance of \ref stasis_message_type,
60  * which can be created by calling stasis_message_type_create(). Message types
61  * are named, which is useful in debugging. It is recommended that the string
62  * name for a message type match the name of the struct that's stored in the
63  * message. For example, name for \ref stasis_cache_update's message type is \c
64  * "stasis_cache_update".
65  *
66  * \par stasis_topic
67  *
68  * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
69  * subscribed, and \ref stasis_message's may be published. Any message published
70  * to the topic is dispatched to all of its subscribers. The topic itself may be
71  * named, which is useful in debugging.
72  *
73  * Topics themselves are reference counted objects. Since topics are referred to
74  * by their subscibers, they will not be freed until all of their subscribers
75  * have unsubscribed. Topics are also thread safe, so no worries about
76  * publishing/subscribing/unsubscribing to a topic concurrently from multiple
77  * threads. It's also designed to handle the case of unsubscribing from a topic
78  * from within the subscription handler.
79  *
80  * \par Forwarding
81  *
82  * There is one special case of topics that's worth noting: forwarding
83  * messages. It's a fairly common use case to want to forward all the messages
84  * published on one topic to another one (for example, an aggregator topic that
85  * publishes all the events from a set of other topics). This can be
86  * accomplished easily using stasis_forward_all(). This sets up the forwarding
87  * between the two topics, and returns a \ref stasis_subscription, which can be
88  * unsubscribed to stop the forwarding.
89  *
90  * \par Caching
91  *
92  * Another common use case is to want to cache certain messages that are
93  * published on the bus. Usually these events are snapshots of the current state
94  * in the system, and it's desirable to query that state from the cache without
95  * locking the original object. It's also desirable for subscribers of the
96  * caching topic to receive messages that have both the old cache value and the
97  * new value being put into the cache. For this, we have stasis_cache_create()
98  * and stasis_caching_topic_create(), providing them with the topic which
99  * publishes the messages that you wish to cache, and a function that can
100  * identify cacheable messages.
101  *
102  * The \ref stasis_cache is designed so that it may be shared amongst several
103  * \ref stasis_caching_topic objects. This allows you to have individual caching
104  * topics per-object (i.e. so you can subscribe to updates for a single object),
105  * and still have a single cache to query for the state of all objects. While a
106  * cache may be shared amongst different message types, such a usage is probably
107  * not a good idea.
108  *
109  * The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
110  * It's a thread safe container, so freely use the stasis_cache_get() and
111  * stasis_cache_dump() to query the cache.
112  *
113  * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
114  * message is wrapped in a \ref stasis_cache_update message which provides the
115  * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
116  * (or \c NULL if the entry was removed from the cache). A
117  * stasis_cache_clear_create() message must be sent to the topic in order to
118  * remove entries from the cache.
119  *
120  * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
121  * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
122  * stasis_caching_topic will not be freed until after it has been unsubscribed,
123  * and all other ao2_ref()'s have been cleaned up.
124  *
125  * The \ref stasis_cache object is a normal AO2 managed object, which can be
126  * release with ao2_cleanup().
127  *
128  * \par stasis_subscriber
129  *
130  * Any topic may be subscribed to by simply providing stasis_subscribe() the
131  * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
132  * data that is passed back to the handler. Invocations on the subscription's
133  * handler are serialized, but different invocations may occur on different
134  * threads (this usually isn't important unless you use thread locals or
135  * something similar).
136  *
137  * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
138  * stasis_subscription. Due to cyclic references, the \ref
139  * stasis_subscription will not be freed until after it has been unsubscribed,
140  * and all other ao2_ref()'s have been cleaned up.
141  *
142  * \par Shutdown
143  *
144  * Subscriptions have two options for unsubscribing, depending upon the context
145  * in which you need to unsubscribe.
146  *
147  * If your subscription is owned by a module, and you must unsubscribe from the
148  * module_unload() function, then you'll want to use the
149  * stasis_unsubscribe_and_join() function. This will block until the final
150  * message has been received on the subscription. Otherwise, there's the danger
151  * of invoking the callback function after it has been unloaded.
152  *
153  * If your subscription is owned by an object, then your object should have an
154  * explicit shutdown() function, which calls stasis_unsubscribe(). In your
155  * subscription handler, when the stasis_subscription_final_message() has been
156  * received, decrement the refcount on your object. In your object's destructor,
157  * you may assert that stasis_subscription_is_done() to validate that the
158  * subscription's callback will no longer be invoked.
159  *
160  * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
161  * an object's destructor. While code that does this may work most of the time,
162  * it's got one big downside. There's a general assumption that object
163  * destruction is non-blocking. If you block the destruction waiting for the
164  * subscription to complete, there's the danger that the subscription may
165  * process a message which will bump the refcount up by one. Then it does
166  * whatever it does, decrements the refcount, which then proceeds to re-destroy
167  * the object. Now you've got hard to reproduce bugs that only show up under
168  * certain loads.
169  */
170 
171 #include "asterisk/json.h"
172 #include "asterisk/manager.h"
173 #include "asterisk/utils.h"
174 #include "asterisk/event.h"
175 
176 /*!
177  * \brief Metadata about a \ref stasis_message.
178  * \since 12
179  */
180 struct stasis_message_type;
181 
182 /*!
183  * \brief Opaque type for a Stasis message.
184  * \since 12
185  */
186 struct stasis_message;
187 
188 /*!
189  * \brief Opaque type for a Stasis subscription.
190  * \since 12
191  */
192 struct stasis_subscription;
193 
194 /*!
195  * \brief Structure containing callbacks for Stasis message sanitization
196  *
197  * \note If either callback is implemented, both should be implemented since
198  * not all callers may have access to the full snapshot.
199  */
201  /*!
202  * \brief Callback which determines whether a channel should be sanitized from
203  * a message based on the channel's unique ID
204  *
205  * \param channel_id The unique ID of the channel
206  *
207  * \retval non-zero if the channel should be left out of the message
208  * \retval zero if the channel should remain in the message
209  */
210  int (*channel_id)(const char *channel_id);
211 
212  /*!
213  * \brief Callback which determines whether a channel should be sanitized from
214  * a message based on the channel's snapshot
215  *
216  * \param snapshot A snapshot generated from the channel
217  *
218  * \retval non-zero if the channel should be left out of the message
219  * \retval zero if the channel should remain in the message
220  */
221  int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
222 
223  /*!
224  * \brief Callback which determines whether a channel should be sanitized from
225  * a message based on the channel
226  *
227  * \param chan The channel to be checked
228  *
229  * \retval non-zero if the channel should be left out of the message
230  * \retval zero if the channel should remain in the message
231  */
232  int (*channel)(const struct ast_channel *chan);
233 };
234 
235 /*!
236  * \brief Virtual table providing methods for messages.
237  * \since 12
238  */
240  /*!
241  * \brief Build the JSON representation of the message.
242  *
243  * May be \c NULL, or may return \c NULL, to indicate no representation.
244  * The returned object should be ast_json_unref()'ed.
245  *
246  * \param message Message to convert to JSON string.
247  * \param sanitize Snapshot sanitization callback.
248  *
249  * \return Newly allocated JSON message.
250  * \return \c NULL on error.
251  * \return \c NULL if JSON format is not supported.
252  */
253  struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
254 
255  /*!
256  * \brief Build the AMI representation of the message.
257  *
258  * May be \c NULL, or may return \c NULL, to indicate no representation.
259  * The returned object should be ao2_cleanup()'ed.
260  *
261  * \param message Message to convert to AMI string.
262  * \return Newly allocated \ref ast_manager_event_blob.
263  * \return \c NULL on error.
264  * \return \c NULL if AMI format is not supported.
265  */
266  struct ast_manager_event_blob *(*to_ami)(
267  struct stasis_message *message);
268 
269  /*!
270  * \since 12.3.0
271  * \brief Build the \ref ast_event representation of the message.
272  *
273  * May be \c NULL, or may return \c NULL, to indicate no representation.
274  * The returned object should be free'd.
275  *
276  * \param message Message to convert to an \ref ast_event.
277  * \return Newly allocated \ref ast_event.
278  * \return \c NULL on error.
279  * \return \c NULL if AMI format is not supported.
280  */
281  struct ast_event *(*to_event)(
282  struct stasis_message *message);
283 };
284 
285 /*!
286  * \brief Return code for Stasis message type creation attempts
287  */
289  STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
290  STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
291  STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
292 };
293 
294 /*!
295  * \brief Stasis subscription message filters
296  */
298  STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
299  STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
300  STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
301 };
302 
303 /*!
304  * \brief Stasis subscription formatter filters
305  *
306  * There should be an entry here for each member of \ref stasis_message_vtable
307  *
308  * \since 13.25.0
309  * \since 16.2.0
310  */
313  STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
314  STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
315  STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
316 };
317 
318 /*!
319  * \brief Create a new message type.
320  *
321  * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
322  * with it.
323  *
324  * \param name Name of the new type.
325  * \param vtable Virtual table of message methods. May be \c NULL.
326  * \param[out] result The location where the new message type will be placed
327  *
328  * \note Stasis message type creation may be declined if the message type is disabled
329  *
330  * \returns A stasis_message_type_result enum
331  * \since 12
332  */
334  struct stasis_message_vtable *vtable, struct stasis_message_type **result);
335 
336 /*!
337  * \brief Gets the name of a given message type
338  * \param type The type to get.
339  * \return Name of the type.
340  * \return \c NULL if \a type is \c NULL.
341  * \since 12
342  */
343 const char *stasis_message_type_name(const struct stasis_message_type *type);
344 
345 /*!
346  * \brief Gets the hash of a given message type
347  * \param type The type to get the hash of.
348  * \return The hash
349  * \since 13.24.0
350  */
351 unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
352 
353 /*!
354  * \brief Gets the id of a given message type
355  * \param type The type to get the id of.
356  * \return The id
357  * \since 17.0.0
358  */
360 
361 /*!
362  * \brief Check whether a message type is declined
363  *
364  * \param name The name of the message type to check
365  *
366  * \retval zero The message type is not declined
367  * \retval non-zero The message type is declined
368  */
369 int stasis_message_type_declined(const char *name);
370 
371 /*!
372  * \brief Create a new message.
373  *
374  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
375  * with it. Messages are also immutable, and must not be modified after they
376  * are initialized. Especially the \a data in the message.
377  *
378  * \param type Type of the message
379  * \param data Immutable data that is the actual contents of the message
380  *
381  * \return New message
382  * \return \c NULL on error
383  *
384  * \since 12
385  */
387 
388 /*!
389  * \brief Create a new message for an entity.
390  *
391  * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
392  * with it. Messages are also immutable, and must not be modified after they
393  * are initialized. Especially the \a data in the message.
394  *
395  * \param type Type of the message
396  * \param data Immutable data that is the actual contents of the message
397  * \param eid What entity originated this message. (NULL for aggregate)
398  *
399  * \note An aggregate message is a combined representation of the local
400  * and remote entities publishing the message data. e.g., An aggregate
401  * device state represents the combined device state from the local and
402  * any remote entities publishing state for a device. e.g., An aggregate
403  * MWI message is the old/new MWI counts accumulated from the local and
404  * any remote entities publishing to a mailbox.
405  *
406  * \retval New message
407  * \retval \c NULL on error
408  *
409  * \since 12.2.0
410  */
411 struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
412 
413 /*!
414  * \brief Get the entity id for a \ref stasis_message.
415  * \since 12.2.0
416  *
417  * \param msg Message to get eid.
418  *
419  * \retval Entity id of \a msg
420  * \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
421  */
422 const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
423 
424 /*!
425  * \brief Get the message type for a \ref stasis_message.
426  * \param msg Message to type
427  * \return Type of \a msg
428  * \return \c NULL if \a msg is \c NULL.
429  * \since 12
430  */
431 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
432 
433 /*!
434  * \brief Get the data contained in a message.
435  * \param msg Message.
436  * \return Immutable data pointer
437  * \return \c NULL if msg is \c NULL.
438  * \since 12
439  */
440 void *stasis_message_data(const struct stasis_message *msg);
441 
442 /*!
443  * \brief Get the time when a message was created.
444  * \param msg Message.
445  * \return Pointer to the \a timeval when the message was created.
446  * \return \c NULL if msg is \c NULL.
447  * \since 12
448  */
449 const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
450 
451 /*!
452  * \brief Build the JSON representation of the message.
453  *
454  * May return \c NULL, to indicate no representation. The returned object should
455  * be ast_json_unref()'ed.
456  *
457  * \param msg Message to convert to JSON string.
458  * \param sanitize Snapshot sanitization callback.
459  *
460  * \return Newly allocated string with JSON message.
461  * \return \c NULL on error.
462  * \return \c NULL if JSON format is not supported.
463  */
464 struct ast_json *stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize);
465 
466 /*!
467  * \brief Build the AMI representation of the message.
468  *
469  * May return \c NULL, to indicate no representation. The returned object should
470  * be ao2_cleanup()'ed.
471  *
472  * \param msg Message to convert to AMI.
473  * \return \c NULL on error.
474  * \return \c NULL if AMI format is not supported.
475  */
477 
478 /*!
479  * \brief Determine if the given message can be converted to AMI.
480  *
481  * \param msg Message to see if can be converted to AMI.
482  *
483  * \retval 0 Cannot be converted
484  * \retval non-zero Can be converted
485  */
487 
488 /*!
489  * \brief Build the \ref AstGenericEvents representation of the message.
490  *
491  * May return \c NULL, to indicate no representation. The returned object should
492  * be disposed of via \ref ast_event_destroy.
493  *
494  * \param msg Message to convert to AMI.
495  * \return \c NULL on error.
496  * \return \c NULL if AMI format is not supported.
497  */
499 
500 /*!
501  * \brief A topic to which messages may be posted, and subscribers, well, subscribe
502  * \since 12
503  */
504 struct stasis_topic;
505 
506 /*!
507  * \brief Create a new topic.
508  * \param name Name of the new topic.
509  * \return New topic instance.
510  * \return \c NULL on error.
511  * \since 12
512  *
513  * \note There is no explicit ability to unsubscribe all subscribers
514  * from a topic and destroy it. As a result the topic can persist until
515  * the last subscriber unsubscribes itself even if there is no
516  * publisher.
517  *
518  * \note Topic names should be in the form of <subsystem>:<functionality>[/<object>]
519  */
520 struct stasis_topic *stasis_topic_create(const char *name);
521 
522 /*!
523  * \brief Create a new topic with given detail.
524  * \param name Name of the new topic.
525  * \param detail Detail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
526  * \return New topic instance.
527  * \return \c NULL on error.
528  *
529  * \note There is no explicit ability to unsubscribe all subscribers
530  * from a topic and destroy it. As a result the topic can persist until
531  * the last subscriber unsubscribes itself even if there is no
532  * publisher.
533  */
535  const char *name, const char *detail);
536 
537 /*!
538  * \brief Get a topic of the given name.
539  * \param name Topic's name.
540  * \return Name of the topic.
541  * \return \c NULL on error or not exist.
542  *
543  * \note This SHOULD NOT be used in normal operation for publishing messages.
544  */
545 struct stasis_topic *stasis_topic_get(const char *name);
546 
547 /*!
548  * \brief Return the uniqueid of a topic.
549  * \param topic Topic.
550  * \return Uniqueid of the topic.
551  * \return \c NULL if topic is \c NULL.
552  */
553 const char *stasis_topic_uniqueid(const struct stasis_topic *topic);
554 
555 /*!
556  * \brief Return the name of a topic.
557  * \param topic Topic.
558  * \return Name of the topic.
559  * \return \c NULL if topic is \c NULL.
560  */
561 const char *stasis_topic_name(const struct stasis_topic *topic);
562 
563 /*!
564  * \brief Return the detail of a topic.
565  * \param topic Topic.
566  * \return Detail of the topic.
567  * \return \c NULL if topic is \c NULL.
568  * \since 12
569  */
570 const char *stasis_topic_detail(const struct stasis_topic *topic);
571 
572 /*!
573  * \brief Return the number of subscribers of a topic.
574  * \param topic Topic.
575  * \return Number of subscribers of the topic.
576  * \since 17.0.0
577  */
578 size_t stasis_topic_subscribers(const struct stasis_topic *topic);
579 
580 /*!
581  * \brief Publish a message to a topic's subscribers.
582  * \param topic Topic.
583  * \param message Message to publish.
584  *
585  * This call is asynchronous and will return immediately upon queueing
586  * the message for delivery to the topic's subscribers.
587  *
588  * \since 12
589  */
590 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
591 
592 /*!
593  * \brief Publish a message to a topic's subscribers, synchronizing
594  * on the specified subscriber
595  * \param sub Subscription to synchronize on.
596  * \param message Message to publish.
597  *
598  * The caller of stasis_publish_sync will block until the specified
599  * subscriber completes handling of the message.
600  *
601  * All other subscribers to the topic the \ref stasis_subpscription
602  * is subscribed to are also delivered the message; this delivery however
603  * happens asynchronously.
604  *
605  * \since 12.1.0
606  */
608 
609 /*!
610  * \brief Callback function type for Stasis subscriptions.
611  * \param data Data field provided with subscription.
612  * \param message Published message.
613  * \since 12
614  */
615 typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
616 
617 /*!
618  * \brief Stasis subscription callback function that does nothing.
619  *
620  * \note This callback should be used for events are not directly processed, but need
621  * to be generated so data can be retrieved from cache later. Subscriptions with this
622  * callback can be released with \ref stasis_unsubscribe, even during module unload.
623  *
624  * \since 13.5
625  */
627 
628 /*!
629  * \brief Create a subscription.
630  *
631  * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
632  * up this reference), the subscription must be explicitly unsubscribed from its
633  * topic using stasis_unsubscribe().
634  *
635  * The invocations of the callback are serialized, but may not always occur on
636  * the same thread. The invocation order of different subscriptions is
637  * unspecified.
638  *
639  * \param topic Topic to subscribe to.
640  * \param callback Callback function for subscription messages.
641  * \param data Data to be passed to the callback, in addition to the message.
642  * \return New \ref stasis_subscription object.
643  * \return \c NULL on error.
644  * \since 12
645  *
646  * \note This callback will receive a callback with a message indicating it
647  * has been subscribed. This occurs immediately before accepted message
648  * types can be set and the callback must expect to receive it.
649  */
651  stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
652 #define stasis_subscribe(topic, callback, data) __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
653 
654 /*!
655  * \brief Create a subscription whose callbacks occur on a thread pool
656  *
657  * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
658  * up this reference), the subscription must be explicitly unsubscribed from its
659  * topic using stasis_unsubscribe().
660  *
661  * The invocations of the callback are serialized, but will almost certainly not
662  * always happen on the same thread. The invocation order of different subscriptions
663  * is unspecified.
664  *
665  * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
666  * dispatch items to its \c callback. This form of subscription should be used
667  * when many subscriptions may be made to the specified \c topic.
668  *
669  * \param topic Topic to subscribe to.
670  * \param callback Callback function for subscription messages.
671  * \param data Data to be passed to the callback, in addition to the message.
672  * \return New \ref stasis_subscription object.
673  * \return \c NULL on error.
674  * \since 12.8.0
675  *
676  * \note This callback will receive a callback with a message indicating it
677  * has been subscribed. This occurs immediately before accepted message
678  * types can be set and the callback must expect to receive it.
679  */
681  stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
682 #define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
683 
684 /*!
685  * \brief Indicate to a subscription that we are interested in a message type.
686  *
687  * This will cause the subscription to allow the given message type to be
688  * raised to our subscription callback. This enables internal filtering in
689  * the stasis message bus to reduce messages.
690  *
691  * \param subscription Subscription to add message type to.
692  * \param type The message type we wish to receive.
693  * \retval 0 on success
694  * \retval -1 failure
695  *
696  * \since 17.0.0
697  *
698  * \note If you are wanting to use stasis_final_message you will need to accept
699  * \ref stasis_subscription_change_type as a message type.
700  *
701  * \note Until the subscription is set to selective filtering it is possible for it
702  * to receive messages of message types that would not normally be accepted.
703  */
705  const struct stasis_message_type *type);
706 
707 /*!
708  * \brief Indicate to a subscription that we are not interested in a message type.
709  *
710  * \param subscription Subscription to remove message type from.
711  * \param type The message type we don't wish to receive.
712  * \retval 0 on success
713  * \retval -1 failure
714  *
715  * \since 17.0.0
716  */
718  const struct stasis_message_type *type);
719 
720 /*!
721  * \brief Set the message type filtering level on a subscription
722  *
723  * This will cause the subscription to filter messages according to the
724  * provided filter level. For example if selective is used then only
725  * messages matching those provided to \ref stasis_subscription_accept_message_type
726  * will be raised to the subscription callback.
727  *
728  * \param subscription Subscription that should receive all messages.
729  * \param filter What filter to use
730  * \retval 0 on success
731  * \retval -1 failure
732  *
733  * \since 17.0.0
734  */
735 int stasis_subscription_set_filter(struct stasis_subscription *subscription,
737 
738 /*!
739  * \brief Indicate to a subscription that we are interested in messages with one or more formatters.
740  *
741  * \param subscription Subscription to alter.
742  * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
743  *
744  * \since 13.25.0
745  * \since 16.2.0
746  */
749 
750 /*!
751  * \brief Get a bitmap of available formatters for a message type
752  *
753  * \param message_type Message type
754  * \return A bitmap of \ref stasis_subscription_message_formatters
755  *
756  * \since 13.25.0
757  * \since 16.2.0
758  */
760  const struct stasis_message_type *message_type);
761 
762 /*!
763  * \brief Cancel a subscription.
764  *
765  * Note that in an asynchronous system, there may still be messages queued or
766  * in transit to the subscription's callback. These will still be delivered.
767  * There will be a final 'SubscriptionCancelled' message, indicating the
768  * delivery of the final message.
769  *
770  * \param subscription Subscription to cancel.
771  * \return \c NULL for convenience
772  * \since 12
773  */
775  struct stasis_subscription *subscription);
776 
777 /*!
778  * \brief Set the high and low alert water marks of the stasis subscription.
779  * \since 13.10.0
780  *
781  * \param subscription Pointer to a stasis subscription
782  * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
783  * \param high_water New queue high water mark.
784  *
785  * \retval 0 on success.
786  * \retval -1 on error (water marks not changed).
787  */
789  long low_water, long high_water);
790 
791 /*!
792  * \brief Block until the last message is processed on a subscription.
793  *
794  * This function will not return until the \a subscription's callback for the
795  * stasis_subscription_final_message() completes. This allows cleanup routines
796  * to run before unblocking the joining thread.
797  *
798  * \param subscription Subscription to block on.
799  * \since 12
800  */
801 void stasis_subscription_join(struct stasis_subscription *subscription);
802 
803 /*!
804  * \brief Returns whether \a subscription has received its final message.
805  *
806  * Note that a subscription is considered done even while the
807  * stasis_subscription_final_message() is being processed. This allows cleanup
808  * routines to check the status of the subscription.
809  *
810  * \param subscription Subscription.
811  * \return True (non-zero) if stasis_subscription_final_message() has been
812  * received.
813  * \return False (zero) if waiting for the end.
814  */
815 int stasis_subscription_is_done(struct stasis_subscription *subscription);
816 
817 /*!
818  * \brief Cancel a subscription, blocking until the last message is processed.
819  *
820  * While normally it's recommended to stasis_unsubscribe() and wait for
821  * stasis_subscription_final_message(), there are times (like during a module
822  * unload) where you have to wait for the final message (otherwise you'll call
823  * a function in a shared module that no longer exists).
824  *
825  * \param subscription Subscription to cancel.
826  * \return \c NULL for convenience
827  * \since 12
828  */
830  struct stasis_subscription *subscription);
831 
832 struct stasis_forward;
833 
834 /*!
835  * \brief Create a subscription which forwards all messages from one topic to
836  * another.
837  *
838  * Note that the \a topic parameter of the invoked callback will the be the
839  * \a topic the message was sent to, not the topic the subscriber subscribed to.
840  *
841  * \param from_topic Topic to forward.
842  * \param to_topic Destination topic of forwarded messages.
843  * \return New forwarding subscription.
844  * \return \c NULL on error.
845  * \since 12
846  */
848  struct stasis_topic *to_topic);
849 
850 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
851 
852 /*!
853  * \brief Get the unique ID for the subscription.
854  *
855  * \param sub Subscription for which to get the unique ID.
856  * \return Unique ID for the subscription.
857  * \since 12
858  */
859 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
860 
861 /*!
862  * \brief Returns whether a subscription is currently subscribed.
863  *
864  * Note that there may still be messages queued up to be dispatched to this
865  * subscription, but the stasis_subscription_final_message() has been enqueued.
866  *
867  * \param sub Subscription to check
868  * \return False (zero) if subscription is not subscribed.
869  * \return True (non-zero) if still subscribed.
870  */
872 
873 /*!
874  * \brief Determine whether a message is the final message to be received on a subscription.
875  *
876  * \param sub Subscription on which the message was received.
877  * \param msg Message to check.
878  * \return zero if the provided message is not the final message.
879  * \return non-zero if the provided message is the final message.
880  * \since 12
881  */
883 
884 /*! \addtogroup StasisTopicsAndMessages
885  * @{
886  */
887 
888 /*!
889  * \brief Holds details about changes to subscriptions for the specified topic
890  * \since 12
891  */
893  struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
894  char *uniqueid; /*!< The unique ID associated with this subscription */
895  char description[0]; /*!< The description of the change to the subscription associated with the uniqueid */
896 };
897 
898 /*!
899  * \brief Gets the message type for subscription change notices
900  * \return The stasis_message_type for subscription change notices
901  * \since 12
902  */
904 
905 /*! @} */
906 
907 /*!
908  * \brief Pool for topic aggregation
909  */
910 struct stasis_topic_pool;
911 
912 /*!
913  * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
914  * \param pooled_topic Topic to which messages will be routed
915  * \return the new stasis_topic_pool
916  * \return \c NULL on failure
917  */
918 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
919 
920 /*!
921  * \brief Find or create a topic in the pool
922  * \param pool Pool for which to get the topic
923  * \param topic_name Name of the topic to get
924  * \return The already stored or newly allocated topic
925  * \return \c NULL if the topic was not found and could not be allocated
926  */
927 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
928 
929 /*!
930  * \brief Delete a topic from the topic pool
931  *
932  * \param pool Pool from which to delete the topic
933  * \param topic_name Name of the topic to delete in the form of
934  * <pool_topic_name>/<topic_name> or just <topic_name>
935  *
936  * \since 13.24
937  * \since 15.6
938  * \since 16.1
939  */
940 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name);
941 
942 /*!
943  * \brief Check if a topic exists in a pool
944  * \param pool Pool to check
945  * \param topic_name Name of the topic to check
946  * \retval 1 exists
947  * \retval 0 does not exist
948  * \since 13.23.0
949  */
950 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name);
951 
952 /*! \addtogroup StasisTopicsAndMessages
953  * @{
954  */
955 
956 /*!
957  * \brief Message type for cache update messages.
958  * \return Message type for cache update messages.
959  * \since 12
960  */
962 
963 /*!
964  * \brief Cache update message
965  * \since 12
966  */
968  /*! \brief Convenience reference to snapshot type */
970  /*! \brief Old value from the cache */
972  /*! \brief New value */
974 };
975 
976 /*!
977  * \brief Message type for clearing a message from a stasis cache.
978  * \since 12
979  */
981 
982 /*! @} */
983 
984 /*!
985  * \brief A message cache, for use with \ref stasis_caching_topic.
986  * \since 12
987  */
988 struct stasis_cache;
989 
990 /*! Cache entry used for calculating the aggregate snapshot. */
991 struct stasis_cache_entry;
992 
993 /*!
994  * \brief A topic wrapper, which caches certain messages.
995  * \since 12
996  */
997 struct stasis_caching_topic;
998 
999 
1000 /*!
1001  * \brief Callback extract a unique identity from a snapshot message.
1002  *
1003  * This identity is unique to the underlying object of the snapshot, such as the
1004  * UniqueId field of a channel.
1005  *
1006  * \param message Message to extract id from.
1007  * \return String representing the snapshot's id.
1008  * \return \c NULL if the message_type of the message isn't a handled snapshot.
1009  * \since 12
1010  */
1011 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
1012 
1013 /*!
1014  * \brief Callback to calculate the aggregate cache entry.
1015  * \since 12.2.0
1016  *
1017  * \param entry Cache entry to calculate a new aggregate snapshot.
1018  * \param new_snapshot The shapshot that is being updated.
1019  *
1020  * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
1021  * if a new aggregate could not be calculated because of error.
1022  *
1023  * \note An aggregate message is a combined representation of the local
1024  * and remote entities publishing the message data. e.g., An aggregate
1025  * device state represents the combined device state from the local and
1026  * any remote entities publishing state for a device. e.g., An aggregate
1027  * MWI message is the old/new MWI counts accumulated from the local and
1028  * any remote entities publishing to a mailbox.
1029  *
1030  * \return New aggregate-snapshot calculated on success.
1031  * Caller has a reference on return.
1032  */
1033 typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
1034 
1035 /*!
1036  * \brief Callback to publish the aggregate cache entry message.
1037  * \since 12.2.0
1038  *
1039  * \details
1040  * Once an aggregate message is calculated. This callback publishes the
1041  * message so subscribers will know the new value of an aggregated state.
1042  *
1043  * \param topic The aggregate message may be published to this topic.
1044  * It is the topic to which the cache itself is subscribed.
1045  * \param aggregate The aggregate shapshot message to publish.
1046  *
1047  * \note It is up to the function to determine if there is a better topic
1048  * the aggregate message should be published over.
1049  *
1050  * \note An aggregate message is a combined representation of the local
1051  * and remote entities publishing the message data. e.g., An aggregate
1052  * device state represents the combined device state from the local and
1053  * any remote entities publishing state for a device. e.g., An aggregate
1054  * MWI message is the old/new MWI counts accumulated from the local and
1055  * any remote entities publishing to a mailbox.
1056  *
1057  * \return Nothing
1058  */
1059 typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
1060 
1061 /*!
1062  * \brief Get the aggregate cache entry snapshot.
1063  * \since 12.2.0
1064  *
1065  * \param entry Cache entry to get the aggregate snapshot.
1066  *
1067  * \note A reference is not given to the returned pointer so don't unref it.
1068  *
1069  * \note An aggregate message is a combined representation of the local
1070  * and remote entities publishing the message data. e.g., An aggregate
1071  * device state represents the combined device state from the local and
1072  * any remote entities publishing state for a device. e.g., An aggregate
1073  * MWI message is the old/new MWI counts accumulated from the local and
1074  * any remote entities publishing to a mailbox.
1075  *
1076  * \retval Aggregate-snapshot in cache.
1077  * \retval NULL if not present.
1078  */
1080 
1081 /*!
1082  * \brief Get the local entity's cache entry snapshot.
1083  * \since 12.2.0
1084  *
1085  * \param entry Cache entry to get the local entity's snapshot.
1086  *
1087  * \note A reference is not given to the returned pointer so don't unref it.
1088  *
1089  * \retval Internal-snapshot in cache.
1090  * \retval NULL if not present.
1091  */
1093 
1094 /*!
1095  * \brief Get a remote entity's cache entry snapshot by index.
1096  * \since 12.2.0
1097  *
1098  * \param entry Cache entry to get a remote entity's snapshot.
1099  * \param idx Which remote entity's snapshot to get.
1100  *
1101  * \note A reference is not given to the returned pointer so don't unref it.
1102  *
1103  * \retval Remote-entity-snapshot in cache.
1104  * \retval NULL if not present.
1105  */
1106 struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
1107 
1108 /*!
1109  * \brief Create a cache.
1110  *
1111  * This is the backend store for a \ref stasis_caching_topic. The cache is
1112  * thread safe, allowing concurrent reads and writes.
1113  *
1114  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
1115  *
1116  * \param id_fn Callback to extract the id from a snapshot message.
1117  *
1118  * \retval New cache indexed by \a id_fn.
1119  * \retval \c NULL on error
1120  *
1121  * \since 12
1122  */
1124 
1125 /*!
1126  * \brief Create a cache.
1127  *
1128  * This is the backend store for a \ref stasis_caching_topic. The cache is
1129  * thread safe, allowing concurrent reads and writes.
1130  *
1131  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
1132  *
1133  * \param id_fn Callback to extract the id from a snapshot message.
1134  * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
1135  * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
1136  *
1137  * \note An aggregate message is a combined representation of the local
1138  * and remote entities publishing the message data. e.g., An aggregate
1139  * device state represents the combined device state from the local and
1140  * any remote entities publishing state for a device. e.g., An aggregate
1141  * MWI message is the old/new MWI counts accumulated from the local and
1142  * any remote entities publishing to a mailbox.
1143  *
1144  * \retval New cache indexed by \a id_fn.
1145  * \retval \c NULL on error
1146  *
1147  * \since 12.2.0
1148  */
1150 
1151 /*!
1152  * \brief Create a topic which monitors and caches messages from another topic.
1153  *
1154  * The idea is that some topics publish 'snapshots' of some other object's state
1155  * that should be cached. When these snapshot messages are received, the cache
1156  * is updated, and a stasis_cache_update() message is forwarded, which has both
1157  * the original snapshot message and the new message.
1158  *
1159  * The returned object is AO2 managed, so ao2_cleanup() when done with it.
1160  *
1161  * \param original_topic Topic publishing snapshot messages.
1162  * \param cache Backend cache in which to keep snapshots.
1163  * \return New topic which changes snapshot messages to stasis_cache_update()
1164  * messages, and forwards all other messages from the original topic.
1165  * \return \c NULL on error
1166  * \since 12
1167  */
1169  struct stasis_topic *original_topic, struct stasis_cache *cache);
1170 
1171 /*!
1172  * \brief Unsubscribes a caching topic from its upstream topic.
1173  *
1174  * This function returns immediately, so be sure to cleanup when
1175  * stasis_subscription_final_message() is received.
1176  *
1177  * \param caching_topic Caching topic to unsubscribe
1178  * \return \c NULL for convenience
1179  * \since 12
1180  */
1182  struct stasis_caching_topic *caching_topic);
1183 
1184 /*!
1185  * \brief Unsubscribes a caching topic from its upstream topic, blocking until
1186  * all messages have been forwarded.
1187  *
1188  * See stasis_unsubscriben_and_join() for more info on when to use this as
1189  * opposed to stasis_caching_unsubscribe().
1190  *
1191  * \param caching_topic Caching topic to unsubscribe
1192  * \return \c NULL for convenience
1193  * \since 12
1194  */
1196  struct stasis_caching_topic *caching_topic);
1197 
1198 /*!
1199  * \brief Returns the topic of cached events from a caching topics.
1200  * \param caching_topic The caching topic.
1201  * \return The topic that publishes cache update events, along with passthrough
1202  * events from the underlying topic.
1203  * \return \c NULL if \a caching_topic is \c NULL.
1204  * \since 12
1205  */
1207  struct stasis_caching_topic *caching_topic);
1208 
1209 /*!
1210  * \brief Indicate to a caching topic that we are interested in a message type.
1211  *
1212  * This will cause the caching topic to receive messages of the given message
1213  * type. This enables internal filtering in the stasis message bus to reduce
1214  * messages.
1215  *
1216  * \param caching_topic The caching topic.
1217  * \param type The message type we wish to receive.
1218  * \retval 0 on success
1219  * \retval -1 failure
1220  *
1221  * \since 17.0.0
1222  */
1224  struct stasis_message_type *type);
1225 
1226 /*!
1227  * \brief Set the message type filtering level on a cache
1228  *
1229  * This will cause the underlying subscription to filter messages according to the
1230  * provided filter level. For example if selective is used then only
1231  * messages matching those provided to \ref stasis_subscription_accept_message_type
1232  * will be raised to the subscription callback.
1233  *
1234  * \param caching_topic The caching topic.
1235  * \param filter What filter to use
1236  * \retval 0 on success
1237  * \retval -1 failure
1238  *
1239  * \since 17.0.0
1240  */
1241 int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
1243 
1244 /*!
1245  * \brief A message which instructs the caching topic to remove an entry from
1246  * its cache.
1247  *
1248  * \param message Message representative of the cache entry that should be
1249  * cleared. This will become the data held in the
1250  * stasis_cache_clear message.
1251  *
1252  * \return Message which, when sent to a \ref stasis_caching_topic, will clear
1253  * the item from the cache.
1254  * \return \c NULL on error.
1255  * \since 12
1256  */
1258 
1259 /*!
1260  * \brief Retrieve an item from the cache for the ast_eid_default entity.
1261  *
1262  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1263  *
1264  * \param cache The cache to query.
1265  * \param type Type of message to retrieve.
1266  * \param id Identity of the snapshot to retrieve.
1267  *
1268  * \retval Message from the cache.
1269  * \retval \c NULL if message is not found.
1270  *
1271  * \since 12
1272  */
1273 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1274 
1275 /*!
1276  * \brief Retrieve an item from the cache for a specific entity.
1277  *
1278  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
1279  *
1280  * \param cache The cache to query.
1281  * \param type Type of message to retrieve.
1282  * \param id Identity of the snapshot to retrieve.
1283  * \param eid Specific entity id to retrieve. NULL for aggregate.
1284  *
1285  * \note An aggregate message is a combined representation of the local
1286  * and remote entities publishing the message data. e.g., An aggregate
1287  * device state represents the combined device state from the local and
1288  * any remote entities publishing state for a device. e.g., An aggregate
1289  * MWI message is the old/new MWI counts accumulated from the local and
1290  * any remote entities publishing to a mailbox.
1291  *
1292  * \retval Message from the cache.
1293  * \retval \c NULL if message is not found.
1294  *
1295  * \since 12.2.0
1296  */
1297 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
1298 
1299 /*!
1300  * \brief Retrieve all matching entity items from the cache.
1301  * \since 12.2.0
1302  *
1303  * \param cache The cache to query.
1304  * \param type Type of message to retrieve.
1305  * \param id Identity of the snapshot to retrieve.
1306  *
1307  * \retval Container of matching items found.
1308  * \retval \c NULL if error.
1309  */
1310 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
1311 
1312 /*!
1313  * \brief Dump cached items to a subscription for the ast_eid_default entity.
1314  *
1315  * \param cache The cache to query.
1316  * \param type Type of message to dump (any type if \c NULL).
1317  *
1318  * \retval ao2_container containing all matches (must be unreffed by caller)
1319  * \retval \c NULL on allocation error
1320  *
1321  * \since 12
1322  */
1324 
1325 /*!
1326  * \brief Dump cached items to a subscription for a specific entity.
1327  * \since 12.2.0
1328  *
1329  * \param cache The cache to query.
1330  * \param type Type of message to dump (any type if \c NULL).
1331  * \param eid Specific entity id to retrieve. NULL for aggregate.
1332  *
1333  * \retval ao2_container containing all matches (must be unreffed by caller)
1334  * \retval \c NULL on allocation error
1335  */
1336 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
1337 
1338 /*!
1339  * \brief Dump all entity items from the cache to a subscription.
1340  * \since 12.2.0
1341  *
1342  * \param cache The cache to query.
1343  * \param type Type of message to dump (any type if \c NULL).
1344  *
1345  * \retval ao2_container containing all matches (must be unreffed by caller)
1346  * \retval \c NULL on allocation error
1347  */
1349 
1350 /*! \addtogroup StasisTopicsAndMessages
1351  * @{
1352  */
1353 
1354 /*!
1355  * \brief Object type code for multi user object snapshots
1356  */
1358  STASIS_UMOS_CHANNEL = 0, /*!< Channel Snapshots */
1359  STASIS_UMOS_BRIDGE, /*!< Bridge Snapshots */
1360  STASIS_UMOS_ENDPOINT, /*!< Endpoint Snapshots */
1361 };
1362 
1363 /*! \brief Number of snapshot types */
1364 #define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
1365 
1366 /*!
1367  * \brief Message type for custom user defined events with multi object blobs
1368  * \return The stasis_message_type for user event
1369  * \since 12.3.0
1370  */
1372 
1373 /*!
1374  * \brief Create a stasis multi object blob
1375  * \since 12.3.0
1376  *
1377  * \details
1378  * Multi object blob can store a combination of arbitrary json values
1379  * (the blob) and also snapshots of various other system objects (such
1380  * as channels, bridges, etc) for delivery through a stasis message.
1381  * The multi object blob is first created, then optionally objects
1382  * are added to it, before being attached to a message and delivered
1383  * to stasis topic.
1384  *
1385  * \param blob Json blob
1386  *
1387  * \note When used for an ast_multi_user_event_type message, the
1388  * json blob should contain at minimum {eventname: name}.
1389  *
1390  * \retval ast_multi_object_blob* if succeeded
1391  * \retval NULL if creation failed
1392  */
1394 
1395 /*!
1396  * \brief Add an object to a multi object blob previously created
1397  * \since 12.3.0
1398  *
1399  * \param multi The multi object blob previously created
1400  * \param type Type code for the object such as channel, bridge, etc.
1401  * \param object Snapshot object of the type supplied to typename
1402  *
1403  * \return Nothing
1404  */
1406 
1407 /*!
1408  * \brief Create and publish a stasis message blob on a channel with it's snapshot
1409  * \since 12.3.0
1410  *
1411  * \details
1412  * For compatibility with app_userevent, this creates a multi object
1413  * blob message, attaches the channel snapshot to it, and publishes it
1414  * to the channel's topic.
1415  *
1416  * \param chan The channel to snapshot and publish event to
1417  * \param type The message type
1418  * \param blob A json blob to publish with the snapshot
1419  *
1420  * \return Nothing
1421  */
1423 
1424 
1425 /*! @} */
1426 
1427 /*!
1428  * \internal
1429  * \brief Log a message about invalid attempt to access a type.
1430  */
1431 void stasis_log_bad_type_access(const char *name);
1432 
1433 /*!
1434  * \brief Boiler-plate messaging macro for defining public message types.
1435  *
1436  * \code
1437  * STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
1438  * .to_ami = foo_to_ami,
1439  * .to_json = foo_to_json,
1440  * .to_event = foo_to_event,
1441  * );
1442  * \endcode
1443  *
1444  * \param name Name of message type.
1445  * \param ... Virtual table methods for messages of this type.
1446  * \since 12
1447  */
1448 #define STASIS_MESSAGE_TYPE_DEFN(name, ...) \
1449  static struct stasis_message_vtable _priv_ ## name ## _v = { \
1450  __VA_ARGS__ \
1451  }; \
1452  static struct stasis_message_type *_priv_ ## name; \
1453  struct stasis_message_type *name(void) { \
1454  if (_priv_ ## name == NULL) { \
1455  stasis_log_bad_type_access(#name); \
1456  } \
1457  return _priv_ ## name; \
1458  }
1459 
1460 /*!
1461  * \brief Boiler-plate messaging macro for defining local message types.
1462  *
1463  * \code
1464  * STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
1465  * .to_ami = foo_to_ami,
1466  * .to_json = foo_to_json,
1467  * .to_event = foo_to_event,
1468  * );
1469  * \endcode
1470  *
1471  * \param name Name of message type.
1472  * \param ... Virtual table methods for messages of this type.
1473  * \since 12
1474  */
1475 #define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...) \
1476  static struct stasis_message_vtable _priv_ ## name ## _v = { \
1477  __VA_ARGS__ \
1478  }; \
1479  static struct stasis_message_type *_priv_ ## name; \
1480  static struct stasis_message_type *name(void) { \
1481  if (_priv_ ## name == NULL) { \
1482  stasis_log_bad_type_access(#name); \
1483  } \
1484  return _priv_ ## name; \
1485  }
1486 
1487 /*!
1488 * \brief Boiler-plate messaging macro for initializing message types.
1489  *
1490  * \code
1491  * if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
1492  * return -1;
1493  * }
1494  * \endcode
1495  *
1496  * \param name Name of message type.
1497  * \return 0 if initialization is successful.
1498  * \return Non-zero on failure.
1499  * \since 12
1500  */
1501 #define STASIS_MESSAGE_TYPE_INIT(name) \
1502  ({ \
1503  ast_assert(_priv_ ## name == NULL); \
1504  stasis_message_type_create(#name, \
1505  &_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
1506  })
1507 
1508 /*!
1509  * \brief Boiler-plate messaging macro for cleaning up message types.
1510  *
1511  * Note that if your type is defined in core instead of a loadable module, you
1512  * should call message type cleanup from an ast_register_cleanup() handler
1513  * instead of an ast_register_atexit() handler.
1514  *
1515  * The reason is that during an immediate shutdown, loadable modules (which may
1516  * refer to core message types) are not unloaded. While the atexit handlers are
1517  * run, there's a window of time where a module subscription might reference a
1518  * core message type after it's been cleaned up. Which is bad.
1519  *
1520  * \param name Name of message type.
1521  * \since 12
1522  */
1523 #define STASIS_MESSAGE_TYPE_CLEANUP(name) \
1524  ({ \
1525  ao2_cleanup(_priv_ ## name); \
1526  _priv_ ## name = NULL; \
1527  })
1528 
1529 /*!
1530  * \brief Initialize the Stasis subsystem.
1531  * \return 0 on success.
1532  * \return Non-zero on error.
1533  * \since 12
1534  */
1535 int stasis_init(void);
1536 
1537 /*!
1538  * \internal
1539  * \brief called by stasis_init() for cache initialization.
1540  * \return 0 on success.
1541  * \return Non-zero on error.
1542  * \since 12
1543  */
1544 int stasis_cache_init(void);
1545 
1546 /*!
1547  * \internal
1548  * \brief called by stasis_init() for config initialization.
1549  * \return 0 on success.
1550  * \return Non-zero on error.
1551  * \since 12
1552  */
1553 int stasis_config_init(void);
1554 
1555 /*!
1556  * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
1557  *
1558  * \brief This group contains the topics, messages and corresponding message types
1559  * found within Asterisk.
1560  */
1561 
1562 #endif /* _ASTERISK_STASIS_H */
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
static const char type[]
Definition: chan_ooh323.c:109
Struct containing info for an AMI event to send out.
Definition: manager.h:491
Main Channel structure associated with a channel.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
An event.
Definition: event.c:81
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2283
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3061
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:971
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis multi object blob.
Definition: stasis.c:1975
struct stasis_topic * topic
Definition: stasis.c:685
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
Virtual table providing methods for messages.
Definition: stasis.h:239
struct ast_eid eid
int(* channel_snapshot)(const struct ast_channel_snapshot *snapshot)
Callback which determines whether a channel should be sanitized from a message based on the channel&#39;s...
Definition: stasis.h:221
int(* channel_id)(const char *channel_id)
Callback which determines whether a channel should be sanitized from a message based on the channel&#39;s...
Definition: stasis.h:210
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
void(* cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate)
Callback to publish the aggregate cache entry message.
Definition: stasis.h:1059
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
static pj_pool_t * pool
Global memory pool for configuration and timers.
Structure representing a snapshot of channel state.
int stasis_config_init(void)
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1055
enum stasis_message_type_result stasis_message_type_create(const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
Create a new message type.
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
Definition: stasis_cache.c:365
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
Definition: muted.c:95
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1884
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
Definition: stasis_cache.c:736
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
Definition: stasis_cache.c:146
void stasis_log_bad_type_access(const char *name)
Definition: stasis.c:1940
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1013
struct stasis_topic * from_topic
Definition: stasis.c:1533
Utility functions.
struct stasis_message *(* cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Callback to calculate the aggregate cache entry.
Definition: stasis.h:1033
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic...
Definition: stasis.c:1833
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct ast_manager_event_blob * stasis_message_to_ami(struct stasis_message *msg)
Build the AMI representation of the message.
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
Asterisk JSON abstraction layer.
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:569
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1927
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1357
struct stasis_topic * to_topic
Definition: stasis.c:1535
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
Cache update message.
Definition: stasis.h:967
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1095
struct stasis_topic * topic
Definition: stasis.h:893
The AMI - Asterisk Manager Interface - is a TCP protocol created to manage Asterisk with third-party ...
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:973
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1864
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1107
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity&#39;s cache entry snapshot by index.
Definition: stasis_cache.c:375
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:623
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:615
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:811
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
static const char name[]
Definition: cdr_mysql.c:74
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1136
int stasis_message_can_be_ami(struct stasis_message *msg)
Determine if the given message can be converted to AMI.
int stasis_cache_init(void)
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object to a multi object blob previously created.
Definition: stasis.c:2001
struct ao2_container * cache
Definition: pbx_realtime.c:77
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:955
stasis_subscription_cb callback
Definition: stasis.c:689
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Create and publish a stasis message blob on a channel with it&#39;s snapshot.
Definition: stasis.c:2010
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity&#39;s cache entry snapshot.
Definition: stasis_cache.c:370
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:297
const char * stasis_topic_uniqueid(const struct stasis_topic *topic)
Return the uniqueid of a topic.
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:944
static PGresult * result
Definition: cel_pgsql.c:88
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:636
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
Definition: stasis_cache.c:587
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
Abstract JSON element (object, array, string, int, ...).
struct ast_json * blob
Definition: stasis.c:1951
snapshot_get_id id_fn
Definition: stasis_cache.c:48
Definition: search.h:40
Forwarding information.
Definition: stasis.c:1531
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
Generic container type.
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
stasis_message_type_result
Return code for Stasis message type creation attempts.
Definition: stasis.h:288
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1171
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:311
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition: stasis.h:1011
Definition: stasis_cache.c:173