Asterisk - The Open Source Telephony Project
18.5.0
|
The Stasis Message Bus is a loosely typed mechanism for distributing messages within Asterisk. It is designed to be:
There are three main concepts for using the Stasis Message Bus:
Central to the Stasis Message Bus is the stasis_message, the messages that are sent on the bus. These messages have:
void
pointer to an AO2 objectOnce a stasis_message has been created, it is immutable and cannot change. The same goes for the value of the message (although this cannot be enforced in code). Messages themselves are reference-counted, AO2 objects, along with their values. By being both reference counted and immutable, messages can be shared throughout the system without any concerns for threading.
The type of a message is defined by an instance of stasis_message_type, which can be created by calling stasis_message_type_create(). Message types are named, which is useful in debugging. It is recommended that the string name for a message type match the name of the struct that's stored in the message. For example, name for stasis_cache_update's message type is "stasis_cache_update"
.
A stasis_topic is an object to which stasis_subscriber's may be subscribed, and stasis_message's may be published. Any message published to the topic is dispatched to all of its subscribers. The topic itself may be named, which is useful in debugging.
Topics themselves are reference counted objects. Since topics are referred to by their subscibers, they will not be freed until all of their subscribers have unsubscribed. Topics are also thread safe, so no worries about publishing/subscribing/unsubscribing to a topic concurrently from multiple threads. It's also designed to handle the case of unsubscribing from a topic from within the subscription handler.
There is one special case of topics that's worth noting: forwarding messages. It's a fairly common use case to want to forward all the messages published on one topic to another one (for example, an aggregator topic that publishes all the events from a set of other topics). This can be accomplished easily using stasis_forward_all(). This sets up the forwarding between the two topics, and returns a stasis_subscription, which can be unsubscribed to stop the forwarding.
Another common use case is to want to cache certain messages that are published on the bus. Usually these events are snapshots of the current state in the system, and it's desirable to query that state from the cache without locking the original object. It's also desirable for subscribers of the caching topic to receive messages that have both the old cache value and the new value being put into the cache. For this, we have stasis_cache_create() and stasis_caching_topic_create(), providing them with the topic which publishes the messages that you wish to cache, and a function that can identify cacheable messages.
The stasis_cache is designed so that it may be shared amongst several stasis_caching_topic objects. This allows you to have individual caching topics per-object (i.e. so you can subscribe to updates for a single object), and still have a single cache to query for the state of all objects. While a cache may be shared amongst different message types, such a usage is probably not a good idea.
The stasis_cache can only be written to by stasis_caching_topics. It's a thread safe container, so freely use the stasis_cache_get() and stasis_cache_dump() to query the cache.
The stasis_caching_topic discards non-cacheable messages. A cacheable message is wrapped in a stasis_cache_update message which provides the old snapshot (or NULL
if this is a new cache entry), and the new snapshot (or NULL
if the entry was removed from the cache). A stasis_cache_clear_create() message must be sent to the topic in order to remove entries from the cache.
In order to unsubscribe a stasis_caching_topic from the upstream topic, call stasis_caching_unsubscribe(). Due to cyclic references, the stasis_caching_topic will not be freed until after it has been unsubscribed, and all other ao2_ref()'s have been cleaned up.
The stasis_cache object is a normal AO2 managed object, which can be release with ao2_cleanup().
Any topic may be subscribed to by simply providing stasis_subscribe() the stasis_topic to subscribe to, a handler function and void
pointer to data that is passed back to the handler. Invocations on the subscription's handler are serialized, but different invocations may occur on different threads (this usually isn't important unless you use thread locals or something similar).
In order to stop receiving messages, call stasis_unsubscribe() with your stasis_subscription. Due to cyclic references, the stasis_subscription will not be freed until after it has been unsubscribed, and all other ao2_ref()'s have been cleaned up.
Subscriptions have two options for unsubscribing, depending upon the context in which you need to unsubscribe.
If your subscription is owned by a module, and you must unsubscribe from the module_unload() function, then you'll want to use the stasis_unsubscribe_and_join() function. This will block until the final message has been received on the subscription. Otherwise, there's the danger of invoking the callback function after it has been unloaded.
If your subscription is owned by an object, then your object should have an explicit shutdown() function, which calls stasis_unsubscribe(). In your subscription handler, when the stasis_subscription_final_message() has been received, decrement the refcount on your object. In your object's destructor, you may assert that stasis_subscription_is_done() to validate that the subscription's callback will no longer be invoked.
Note: You may be tempted to simply call stasis_unsubscribe_and_join() from an object's destructor. While code that does this may work most of the time, it's got one big downside. There's a general assumption that object destruction is non-blocking. If you block the destruction waiting for the subscription to complete, there's the danger that the subscription may process a message which will bump the refcount up by one. Then it does whatever it does, decrements the refcount, which then proceeds to re-destroy the object. Now you've got hard to reproduce bugs that only show up under certain loads.