40 #define NUM_CACHE_BUCKETS 17 42 #define NUM_CACHE_BUCKETS 563 87 return caching_topic->
topic;
112 if (!caching_topic) {
121 if (!caching_topic) {
148 if (!caching_topic) {
231 if (!entry->
key.
id) {
259 entry->
local = snapshot;
285 return (
int)key->
hash;
297 right_key = &object_right->
key;
301 || strcmp(object_left->
key.
id, right_key->
id);
432 old_snapshot = cached_entry->
local;
474 old_snapshot = cached_entry->
local;
517 memset(&snapshots, 0,
sizeof(snapshots));
521 cached_entry =
cache_find(cache->entries, type,
id);
529 }
else if (cached_entry) {
541 if (cache->aggregate_calc_fn && cached_entry) {
542 snapshots.
aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
581 err |= !
ao2_link(snapshots, snapshot);
850 caching_topic_needs_unref = caching_topic;
852 caching_topic_needs_unref =
NULL;
865 if (strcmp(change->
description,
"Unsubscribe") == 0) {
894 if (msg_id && msg_eid) {
899 snapshots =
cache_put(caching_topic->
cache, msg_type, msg_id, msg_eid, msg_put);
900 if (snapshots.old || msg_put) {
910 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
915 if (snapshots.aggregate_old != snapshots.aggregate_new) {
918 snapshots.aggregate_new);
921 update =
update_create(snapshots.aggregate_old, snapshots.aggregate_new);
951 static int caching_id;
962 if (caching_topic ==
NULL) {
980 ast_log(
LOG_ERROR,
"Stasis cache container '%p' for '%s' did not register\n",
989 if (caching_topic->
sub ==
NULL) {
999 return caching_topic;
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
cache_aggregate_calc_fn aggregate_calc_fn
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_message * old_snapshot
Old value from the cache.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *id_message)
A message which instructs the caching topic to remove an entry from its cache.
struct cache_entry_key key
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
The arg parameter is a search key, but is not an object.
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
static int cache_dump_all_cb(void *obj, void *arg, int flags)
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
static void cache_entry_compute_hash(struct cache_entry_key *key)
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
void(* cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate)
Callback to publish the aggregate cache entry message.
#define ao2_callback(c, flags, cb_fn, arg)
struct ao2_container * container
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Generic (perhaps overly so) hashtable implementation Hash Table support in Asterisk.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
struct stasis_cache * cache
static struct stasis_cache_entry * cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Assume that the ao2_container is already locked.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
static int cache_entry_cmp(void *obj, void *arg, int flags)
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
struct stasis_topic * original_topic
static void stasis_cache_update_dtor(void *obj)
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ao2_link_flags(container, obj, flags)
#define ast_strdup(str)
A wrapper for strdup()
struct stasis_message * old
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
An Entity ID is essentially a MAC address, brief and unique.
static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
struct stasis_subscription * sub
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
struct stasis_message *(* cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Callback to calculate the aggregate cache entry.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
struct ao2_container * entries
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_debug(level,...)
Log a DEBUG message.
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
const struct ast_eid * eid
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
#define ao2_ref(o, delta)
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
static struct stasis_message * cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
struct stasis_message * aggregate_new
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
#define AST_VECTOR(name, type)
Define a vector structure.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
static void cache_entry_dtor(void *obj)
static struct cache_put_snapshots cache_put(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
struct stasis_message * aggregate_old
struct stasis_message * new_snapshot
New value.
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_topic * topic
struct stasis_message_type * type
Convenience reference to snapshot type.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
static struct stasis_message * update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
The key for an entry in the cache.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Holds details about changes to subscriptions for the specified topic.
Vector container support.
#define ao2_find(container, arg, flags)
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
struct ao2_container * cache
static void stasis_caching_topic_dtor(void *obj)
struct ast_eid ast_eid_default
Global EID.
The arg parameter is an object of the same type.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
stasis_subscription_message_filter
Stasis subscription message filters.
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
struct stasis_message * local
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
struct stasis_forward * sub
#define ao2_unlink_flags(container, obj, flags)
struct stasis_message_type * type
static void stasis_cache_cleanup(void)
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
#define NUM_CACHE_BUCKETS
Search option field mask.
cache_aggregate_publish_fn aggregate_publish_fn
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
static struct stasis_message * cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type)
struct stasis_message_type * type
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
struct stasis_message * aggregate
static int cache_entry_hash(const void *obj, int flags)
static void cache_dtor(void *obj)
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
int stasis_cache_init(void)
static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
#define ao2_link(container, obj)