Asterisk - The Open Source Telephony Project  18.5.0
Data Structures | Macros | Functions
stasis_cache.c File Reference

Stasis Message API. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/hashtab.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/utils.h"
#include "asterisk/vector.h"
Include dependency graph for stasis_cache.c:

Go to the source code of this file.

Data Structures

struct  cache_dump_data
 
struct  cache_entry_key
 The key for an entry in the cache. More...
 
struct  cache_put_snapshots
 
struct  stasis_cache
 
struct  stasis_cache_entry
 
struct  stasis_caching_topic
 

Macros

#define NUM_CACHE_BUCKETS   563
 

Functions

static void cache_dtor (void *obj)
 
static int cache_dump_all_cb (void *obj, void *arg, int flags)
 
static int cache_dump_by_eid_cb (void *obj, void *arg, int flags)
 
static struct stasis_messagecache_entry_by_eid (const struct stasis_cache_entry *entry, const struct ast_eid *eid)
 
static int cache_entry_cmp (void *obj, void *arg, int flags)
 
static void cache_entry_compute_hash (struct cache_entry_key *key)
 
static struct stasis_cache_entrycache_entry_create (struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
 
static void cache_entry_dtor (void *obj)
 
static int cache_entry_dump (struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
 
static int cache_entry_hash (const void *obj, int flags)
 
static struct stasis_cache_entrycache_find (struct ao2_container *entries, struct stasis_message_type *type, const char *id)
 
static struct cache_put_snapshots cache_put (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 
static struct stasis_messagecache_remove (struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
 
static struct stasis_messagecache_udpate (struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 
static void caching_topic_exec (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
static void print_cache_entry (void *v_obj, void *where, ao2_prnt_fn *prnt)
 
static void stasis_cache_cleanup (void)
 
struct stasis_messagestasis_cache_clear_create (struct stasis_message *id_message)
 A message which instructs the caching topic to remove an entry from its cache. More...
 
struct stasis_cachestasis_cache_create (snapshot_get_id id_fn)
 Create a cache. More...
 
struct stasis_cachestasis_cache_create_full (snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
 Create a cache. More...
 
struct ao2_containerstasis_cache_dump (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump cached items to a subscription for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_dump_all (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump all entity items from the cache to a subscription. More...
 
struct ao2_containerstasis_cache_dump_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 Dump cached items to a subscription for a specific entity. More...
 
struct stasis_messagestasis_cache_entry_get_aggregate (struct stasis_cache_entry *entry)
 Get the aggregate cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_local (struct stasis_cache_entry *entry)
 Get the local entity's cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_remote (struct stasis_cache_entry *entry, int idx)
 Get a remote entity's cache entry snapshot by index. More...
 
struct stasis_messagestasis_cache_get (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve an item from the cache for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_get_all (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve all matching entity items from the cache. More...
 
struct stasis_messagestasis_cache_get_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
 Retrieve an item from the cache for a specific entity. More...
 
int stasis_cache_init (void)
 
static void stasis_cache_update_dtor (void *obj)
 
int stasis_caching_accept_message_type (struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
 Indicate to a caching topic that we are interested in a message type. More...
 
struct stasis_topicstasis_caching_get_topic (struct stasis_caching_topic *caching_topic)
 Returns the topic of cached events from a caching topics. More...
 
int stasis_caching_set_filter (struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache. More...
 
struct stasis_caching_topicstasis_caching_topic_create (struct stasis_topic *original_topic, struct stasis_cache *cache)
 Create a topic which monitors and caches messages from another topic. More...
 
static void stasis_caching_topic_dtor (void *obj)
 
struct stasis_caching_topicstasis_caching_unsubscribe (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic. More...
 
struct stasis_caching_topicstasis_caching_unsubscribe_and_join (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded. More...
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_cache_clear_type)
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_cache_update_type)
 
static struct stasis_messageupdate_create (struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
 

Detailed Description

Stasis Message API.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis_cache.c.

Macro Definition Documentation

◆ NUM_CACHE_BUCKETS

#define NUM_CACHE_BUCKETS   563

Definition at line 42 of file stasis_cache.c.

Referenced by stasis_cache_create_full().

Function Documentation

◆ cache_dtor()

static void cache_dtor ( void *  obj)
static

Definition at line 326 of file stasis_cache.c.

References ao2_cleanup, cache, stasis_cache::entries, and NULL.

Referenced by stasis_cache_create_full().

327 {
328  struct stasis_cache *cache = obj;
329 
330  ao2_cleanup(cache->entries);
331  cache->entries = NULL;
332 }
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
struct ao2_container * cache
Definition: pbx_realtime.c:77
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ cache_dump_all_cb()

static int cache_dump_all_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 741 of file stasis_cache.c.

References ao2_cleanup, cache_entry_dump(), CMP_STOP, cache_dump_data::container, stasis_cache_entry::key, NULL, cache_entry_key::type, and cache_dump_data::type.

Referenced by stasis_cache_dump_all().

742 {
743  struct cache_dump_data *cache_dump = arg;
744  struct stasis_cache_entry *entry = obj;
745 
746  if (!cache_dump->type || entry->key.type == cache_dump->type) {
747  if (cache_entry_dump(cache_dump->container, entry)) {
748  ao2_cleanup(cache_dump->container);
749  cache_dump->container = NULL;
750  return CMP_STOP;
751  }
752  }
753 
754  return 0;
755 }
struct cache_entry_key key
Definition: stasis_cache.c:174
struct ao2_container * container
Definition: stasis_cache.c:692
#define NULL
Definition: resample.c:96
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_message_type * type
Definition: stasis_cache.c:693
Definition: search.h:40
struct stasis_message_type * type
Definition: stasis_cache.c:166
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
Definition: stasis_cache.c:563
Definition: stasis_cache.c:173

◆ cache_dump_by_eid_cb()

static int cache_dump_by_eid_cb ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 697 of file stasis_cache.c.

References ao2_cleanup, ao2_link, cache_entry_by_eid(), CMP_STOP, cache_dump_data::container, cache_dump_data::eid, stasis_cache_entry::key, NULL, cache_entry_key::type, and cache_dump_data::type.

Referenced by stasis_cache_dump_by_eid().

698 {
699  struct cache_dump_data *cache_dump = arg;
700  struct stasis_cache_entry *entry = obj;
701 
702  if (!cache_dump->type || entry->key.type == cache_dump->type) {
703  struct stasis_message *snapshot;
704 
705  snapshot = cache_entry_by_eid(entry, cache_dump->eid);
706  if (snapshot) {
707  if (!ao2_link(cache_dump->container, snapshot)) {
708  ao2_cleanup(cache_dump->container);
709  cache_dump->container = NULL;
710  return CMP_STOP;
711  }
712  }
713  }
714 
715  return 0;
716 }
struct cache_entry_key key
Definition: stasis_cache.c:174
struct ao2_container * container
Definition: stasis_cache.c:692
#define NULL
Definition: resample.c:96
const struct ast_eid * eid
Definition: stasis_cache.c:694
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_message_type * type
Definition: stasis_cache.c:693
Definition: search.h:40
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
Definition: stasis_cache.c:631
struct stasis_message_type * type
Definition: stasis_cache.c:166
#define ao2_link(container, obj)
Definition: astobj2.h:1549
Definition: stasis_cache.c:173

◆ cache_entry_by_eid()

static struct stasis_message* cache_entry_by_eid ( const struct stasis_cache_entry entry,
const struct ast_eid eid 
)
static

Definition at line 631 of file stasis_cache.c.

References stasis_cache_entry::aggregate, ast_eid_cmp(), ast_eid_default, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_cache_entry::local, NULL, and stasis_message_eid().

Referenced by cache_dump_by_eid_cb(), and stasis_cache_get_by_eid().

632 {
633  int is_remote;
634  int idx;
635 
636  if (!eid) {
637  /* Get aggregate. */
638  return entry->aggregate;
639  }
640 
641  /* Get snapshot with specific eid. */
642  is_remote = ast_eid_cmp(eid, &ast_eid_default);
643  if (!is_remote) {
644  return entry->local;
645  }
646 
647  for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
648  struct stasis_message *cur;
649 
650  cur = AST_VECTOR_GET(&entry->remote, idx);
651  if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
652  return cur;
653  }
654  }
655 
656  return NULL;
657 }
#define NULL
Definition: resample.c:96
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
struct stasis_message * local
Definition: stasis_cache.c:178
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
struct stasis_message * aggregate
Definition: stasis_cache.c:176

◆ cache_entry_cmp()

static int cache_entry_cmp ( void *  obj,
void *  arg,
int  flags 
)
static

Definition at line 288 of file stasis_cache.c.

References ast_assert, CMP_MATCH, cache_entry_key::id, stasis_cache_entry::key, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, OBJ_SEARCH_PARTIAL_KEY, and cache_entry_key::type.

Referenced by stasis_cache_create_full().

289 {
290  const struct stasis_cache_entry *object_left = obj;
291  const struct stasis_cache_entry *object_right = arg;
292  const struct cache_entry_key *right_key = arg;
293  int cmp;
294 
295  switch (flags & OBJ_SEARCH_MASK) {
296  case OBJ_SEARCH_OBJECT:
297  right_key = &object_right->key;
298  /* Fall through */
299  case OBJ_SEARCH_KEY:
300  cmp = object_left->key.type != right_key->type
301  || strcmp(object_left->key.id, right_key->id);
302  break;
304  /* Not supported by container */
305  ast_assert(0);
306  cmp = -1;
307  break;
308  default:
309  /*
310  * What arg points to is specific to this traversal callback
311  * and has no special meaning to astobj2.
312  */
313  cmp = 0;
314  break;
315  }
316  if (cmp) {
317  return 0;
318  }
319  /*
320  * At this point the traversal callback is identical to a sorted
321  * container.
322  */
323  return CMP_MATCH;
324 }
struct cache_entry_key key
Definition: stasis_cache.c:174
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
const char * id
Definition: stasis_cache.c:168
#define ast_assert(a)
Definition: utils.h:695
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
The key for an entry in the cache.
Definition: stasis_cache.c:164
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
Search option field mask.
Definition: astobj2.h:1076
struct stasis_message_type * type
Definition: stasis_cache.c:166
Definition: stasis_cache.c:173

◆ cache_entry_compute_hash()

static void cache_entry_compute_hash ( struct cache_entry_key key)
static

Definition at line 206 of file stasis_cache.c.

References ast_hashtab_hash_string(), cache_entry_key::hash, cache_entry_key::id, stasis_message_type_hash(), and cache_entry_key::type.

Referenced by cache_entry_create(), and cache_find().

207 {
208  key->hash = stasis_message_type_hash(key->type);
209  key->hash += ast_hashtab_hash_string(key->id);
210 }
const char * id
Definition: stasis_cache.c:168
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
unsigned int hash
Definition: stasis_cache.c:170
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
Definition: hashtab.c:153
struct stasis_message_type * type
Definition: stasis_cache.c:166

◆ cache_entry_create()

static struct stasis_cache_entry* cache_entry_create ( struct stasis_message_type type,
const char *  id,
struct stasis_message snapshot 
)
static

Definition at line 212 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_cleanup, ast_assert, ast_eid_cmp(), ast_eid_default, ast_strdup, AST_VECTOR_APPEND, AST_VECTOR_INIT, cache_entry_compute_hash(), cache_entry_dtor(), cache_entry_key::id, stasis_cache_entry::key, stasis_cache_entry::local, NULL, stasis_message_eid(), stasis_message_type(), type, and cache_entry_key::type.

Referenced by cache_put().

213 {
214  struct stasis_cache_entry *entry;
215  int is_remote;
216 
217  ast_assert(id != NULL);
218  ast_assert(snapshot != NULL);
219 
220  if (!type) {
221  return NULL;
222  }
223 
224  entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
226  if (!entry) {
227  return NULL;
228  }
229 
230  entry->key.id = ast_strdup(id);
231  if (!entry->key.id) {
232  ao2_cleanup(entry);
233  return NULL;
234  }
235  /*
236  * Normal ao2 ref counting rules says we should increment the message
237  * type ref here and decrement it in cache_entry_dtor(). However, the
238  * stasis message snapshot is cached here, will always have the same type
239  * as the cache entry, and can legitimately cause the type ref count to
240  * hit the excessive ref count assertion. Since the cache entry will
241  * always have a snapshot we can get away with not holding a ref here.
242  */
243  ast_assert(type == stasis_message_type(snapshot));
244  entry->key.type = type;
245  cache_entry_compute_hash(&entry->key);
246 
247  is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
248  if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
249  ao2_cleanup(entry);
250  return NULL;
251  }
252 
253  if (is_remote) {
254  if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
255  ao2_cleanup(entry);
256  return NULL;
257  }
258  } else {
259  entry->local = snapshot;
260  }
261  ao2_bump(snapshot);
262 
263  return entry;
264 }
static const char type[]
Definition: chan_ooh323.c:109
struct cache_entry_key key
Definition: stasis_cache.c:174
const char * id
Definition: stasis_cache.c:168
static void cache_entry_compute_hash(struct cache_entry_key *key)
Definition: stasis_cache.c:206
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:695
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
#define NULL
Definition: resample.c:96
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
#define ao2_bump(obj)
Definition: astobj2.h:491
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
static void cache_entry_dtor(void *obj)
Definition: stasis_cache.c:183
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_message * local
Definition: stasis_cache.c:178
Definition: search.h:40
struct stasis_message_type * type
Definition: stasis_cache.c:166
Definition: stasis_cache.c:173

◆ cache_entry_dtor()

static void cache_entry_dtor ( void *  obj)
static

Definition at line 183 of file stasis_cache.c.

References stasis_cache_entry::aggregate, ao2_cleanup, ast_free, AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, cache_entry_key::id, stasis_cache_entry::key, stasis_cache_entry::local, NULL, and cache_entry_key::type.

Referenced by cache_entry_create().

184 {
185  struct stasis_cache_entry *entry = obj;
186  size_t idx;
187 
188  entry->key.type = NULL;
189  ast_free((char *) entry->key.id);
190  entry->key.id = NULL;
191 
192  ao2_cleanup(entry->aggregate);
193  entry->aggregate = NULL;
194  ao2_cleanup(entry->local);
195  entry->local = NULL;
196 
197  for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
198  struct stasis_message *remote;
199 
200  remote = AST_VECTOR_GET(&entry->remote, idx);
201  ao2_cleanup(remote);
202  }
203  AST_VECTOR_FREE(&entry->remote);
204 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
struct cache_entry_key key
Definition: stasis_cache.c:174
const char * id
Definition: stasis_cache.c:168
#define NULL
Definition: resample.c:96
#define ast_free(a)
Definition: astmm.h:182
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_message * local
Definition: stasis_cache.c:178
Definition: search.h:40
struct stasis_message_type * type
Definition: stasis_cache.c:166
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
struct stasis_message * aggregate
Definition: stasis_cache.c:176
Definition: stasis_cache.c:173

◆ cache_entry_dump()

static int cache_entry_dump ( struct ao2_container snapshots,
const struct stasis_cache_entry entry 
)
static

Definition at line 563 of file stasis_cache.c.

References ao2_link, ast_assert, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_cache_entry::local, and NULL.

Referenced by cache_dump_all_cb(), and stasis_cache_get_all().

564 {
565  int idx;
566  int err = 0;
567 
568  ast_assert(snapshots != NULL);
569  ast_assert(entry != NULL);
570 
571  /* The aggregate snapshot is not a snapshot from an entity. */
572 
573  if (entry->local) {
574  err |= !ao2_link(snapshots, entry->local);
575  }
576 
577  for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
578  struct stasis_message *snapshot;
579 
580  snapshot = AST_VECTOR_GET(&entry->remote, idx);
581  err |= !ao2_link(snapshots, snapshot);
582  }
583 
584  return err;
585 }
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
struct stasis_message * local
Definition: stasis_cache.c:178
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define ao2_link(container, obj)
Definition: astobj2.h:1549

◆ cache_entry_hash()

static int cache_entry_hash ( const void *  obj,
int  flags 
)
static

Definition at line 266 of file stasis_cache.c.

References ast_assert, cache_entry_key::hash, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, and OBJ_SEARCH_OBJECT.

Referenced by stasis_cache_create_full().

267 {
268  const struct stasis_cache_entry *object;
269  const struct cache_entry_key *key;
270 
271  switch (flags & OBJ_SEARCH_MASK) {
272  case OBJ_SEARCH_KEY:
273  key = obj;
274  break;
275  case OBJ_SEARCH_OBJECT:
276  object = obj;
277  key = &object->key;
278  break;
279  default:
280  /* Hash can only work on something with a full key. */
281  ast_assert(0);
282  return 0;
283  }
284 
285  return (int)key->hash;
286 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ast_assert(a)
Definition: utils.h:695
The key for an entry in the cache.
Definition: stasis_cache.c:164
unsigned int hash
Definition: stasis_cache.c:170
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
Search option field mask.
Definition: astobj2.h:1076
Definition: stasis_cache.c:173

◆ cache_find()

static struct stasis_cache_entry* cache_find ( struct ao2_container entries,
struct stasis_message_type type,
const char *  id 
)
static

Definition at line 396 of file stasis_cache.c.

References ao2_find, ast_assert, cache_entry_compute_hash(), cache_entry_key::id, id, stasis_cache_entry::key, OBJ_NOLOCK, OBJ_SEARCH_KEY, stasis_message_type_name(), type, and cache_entry_key::type.

Referenced by cache_put(), caching_topic_exec(), stasis_cache_get_all(), and stasis_cache_get_by_eid().

397 {
398  struct cache_entry_key search_key;
399  struct stasis_cache_entry *entry;
400 
401  search_key.type = type;
402  search_key.id = id;
403  cache_entry_compute_hash(&search_key);
404  entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
405 
406  /* Ensure that what we looked for is what we found. */
407  ast_assert(!entry
408  || (!strcmp(stasis_message_type_name(entry->key.type),
409  stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
410  return entry;
411 }
static const char type[]
Definition: chan_ooh323.c:109
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct cache_entry_key key
Definition: stasis_cache.c:174
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
const char * id
Definition: stasis_cache.c:168
static void cache_entry_compute_hash(struct cache_entry_key *key)
Definition: stasis_cache.c:206
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define ast_assert(a)
Definition: utils.h:695
The key for an entry in the cache.
Definition: stasis_cache.c:164
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
Definition: search.h:40
enum queue_result id
Definition: app_queue.c:1507
struct stasis_message_type * type
Definition: stasis_cache.c:166
Definition: stasis_cache.c:173

◆ cache_put()

static struct cache_put_snapshots cache_put ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id,
const struct ast_eid eid,
struct stasis_message new_snapshot 
)
static

Definition at line 505 of file stasis_cache.c.

References stasis_cache_entry::aggregate, cache_put_snapshots::aggregate_new, cache_put_snapshots::aggregate_old, ao2_bump, ao2_cleanup, ao2_link_flags, ao2_unlock, ao2_wrlock, ast_assert, cache_entry_create(), cache_find(), cache_remove(), cache_udpate(), NULL, OBJ_NOLOCK, cache_put_snapshots::old, and stasis_message_type().

Referenced by caching_topic_exec().

508 {
509  struct stasis_cache_entry *cached_entry;
510  struct cache_put_snapshots snapshots;
511 
512  ast_assert(cache->entries != NULL);
513  ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
514  ast_assert(new_snapshot == NULL ||
515  type == stasis_message_type(new_snapshot));
516 
517  memset(&snapshots, 0, sizeof(snapshots));
518 
519  ao2_wrlock(cache->entries);
520 
521  cached_entry = cache_find(cache->entries, type, id);
522 
523  /* Update the eid snapshot. */
524  if (!new_snapshot) {
525  /* Remove snapshot from cache */
526  if (cached_entry) {
527  snapshots.old = cache_remove(cache->entries, cached_entry, eid);
528  }
529  } else if (cached_entry) {
530  /* Update snapshot in cache */
531  snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
532  } else {
533  /* Insert into the cache */
534  cached_entry = cache_entry_create(type, id, new_snapshot);
535  if (cached_entry) {
536  ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
537  }
538  }
539 
540  /* Update the aggregate snapshot. */
541  if (cache->aggregate_calc_fn && cached_entry) {
542  snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
543  snapshots.aggregate_old = cached_entry->aggregate;
544  cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
545  }
546 
547  ao2_unlock(cache->entries);
548 
549  ao2_cleanup(cached_entry);
550  return snapshots;
551 }
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49
static struct stasis_cache_entry * cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
Definition: stasis_cache.c:212
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
#define ast_assert(a)
Definition: utils.h:695
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_wrlock(a)
Definition: astobj2.h:720
struct ao2_container * entries
Definition: stasis_cache.c:47
#define ao2_bump(obj)
Definition: astobj2.h:491
static struct stasis_message * cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
Definition: stasis_cache.c:425
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static struct stasis_message * cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:466
struct stasis_message * aggregate
Definition: stasis_cache.c:176
Definition: stasis_cache.c:173

◆ cache_remove()

static struct stasis_message* cache_remove ( struct ao2_container entries,
struct stasis_cache_entry cached_entry,
const struct ast_eid eid 
)
static

Definition at line 425 of file stasis_cache.c.

References ao2_unlink_flags, ast_eid_cmp(), ast_eid_default, AST_VECTOR_GET, AST_VECTOR_REMOVE_UNORDERED, AST_VECTOR_SIZE, stasis_cache_entry::local, NULL, OBJ_NOLOCK, and stasis_message_eid().

Referenced by cache_put(), and caching_topic_exec().

426 {
427  struct stasis_message *old_snapshot;
428  int is_remote;
429 
430  is_remote = ast_eid_cmp(eid, &ast_eid_default);
431  if (!is_remote) {
432  old_snapshot = cached_entry->local;
433  cached_entry->local = NULL;
434  } else {
435  int idx;
436 
437  old_snapshot = NULL;
438  for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
439  struct stasis_message *cur;
440 
441  cur = AST_VECTOR_GET(&cached_entry->remote, idx);
442  if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
443  old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
444  break;
445  }
446  }
447  }
448 
449  if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
450  ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
451  }
452 
453  return old_snapshot;
454 }
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438
#define NULL
Definition: resample.c:96
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
struct stasis_message * local
Definition: stasis_cache.c:178
#define ao2_unlink_flags(container, obj, flags)
Definition: astobj2.h:1622
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ cache_udpate()

static struct stasis_message* cache_udpate ( struct stasis_cache_entry cached_entry,
const struct ast_eid eid,
struct stasis_message new_snapshot 
)
static

Definition at line 466 of file stasis_cache.c.

References ao2_bump, ast_eid_cmp(), ast_eid_default, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_REMOVE_UNORDERED, AST_VECTOR_SIZE, stasis_cache_entry::local, NULL, and stasis_message_eid().

Referenced by cache_put().

467 {
468  struct stasis_message *old_snapshot;
469  int is_remote;
470  int idx;
471 
472  is_remote = ast_eid_cmp(eid, &ast_eid_default);
473  if (!is_remote) {
474  old_snapshot = cached_entry->local;
475  cached_entry->local = ao2_bump(new_snapshot);
476  return old_snapshot;
477  }
478 
479  old_snapshot = NULL;
480  for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
481  struct stasis_message *cur;
482 
483  cur = AST_VECTOR_GET(&cached_entry->remote, idx);
484  if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
485  old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
486  break;
487  }
488  }
489  if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
490  ao2_bump(new_snapshot);
491  }
492 
493  return old_snapshot;
494 }
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438
#define NULL
Definition: resample.c:96
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
#define ao2_bump(obj)
Definition: astobj2.h:491
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
struct stasis_message * local
Definition: stasis_cache.c:178
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ caching_topic_exec()

static void caching_topic_exec ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Definition at line 833 of file stasis_cache.c.

References stasis_cache::aggregate_publish_fn, ao2_cleanup, ao2_ref, ao2_unlock, ao2_wrlock, ast_assert, ast_debug, stasis_caching_topic::cache, cache_find(), cache_put(), cache_remove(), stasis_subscription_change::description, stasis_cache::entries, stasis_cache::id_fn, NULL, stasis_caching_topic::original_topic, stasis_cache_clear_type(), stasis_message_data(), stasis_message_eid(), stasis_message_type(), stasis_message_type_name(), stasis_publish(), stasis_subscription_change_type(), stasis_subscription_final_message(), stasis_topic_name(), stasis_topic_subscribers(), stasis_caching_topic::topic, stasis_subscription_change::uniqueid, update(), and update_create().

Referenced by stasis_caching_topic_create().

835 {
836  struct stasis_caching_topic *caching_topic_needs_unref;
837  struct stasis_caching_topic *caching_topic = data;
838  struct stasis_message *msg;
839  struct stasis_message *msg_put;
840  struct stasis_message_type *msg_type;
841  const struct ast_eid *msg_eid;
842  const char *msg_id;
843 
844  ast_assert(caching_topic != NULL);
845  ast_assert(caching_topic->topic != NULL);
846  ast_assert(caching_topic->cache != NULL);
847  ast_assert(caching_topic->cache->id_fn != NULL);
848 
849  if (stasis_subscription_final_message(sub, message)) {
850  caching_topic_needs_unref = caching_topic;
851  } else {
852  caching_topic_needs_unref = NULL;
853  }
854 
855  msg_type = stasis_message_type(message);
856 
857  if (stasis_subscription_change_type() == msg_type) {
858  struct stasis_subscription_change *change = stasis_message_data(message);
859 
860  /*
861  * If this change type is an unsubscribe, we need to find the original
862  * subscribe and remove it from the cache otherwise the cache will
863  * continue to grow unabated.
864  */
865  if (strcmp(change->description, "Unsubscribe") == 0) {
866  struct stasis_cache_entry *cached_sub;
867 
868  ao2_wrlock(caching_topic->cache->entries);
869  cached_sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
870  if (cached_sub) {
871  ao2_cleanup(cache_remove(caching_topic->cache->entries, cached_sub, stasis_message_eid(message)));
872  ao2_cleanup(cached_sub);
873  }
874  ao2_unlock(caching_topic->cache->entries);
875  ao2_cleanup(caching_topic_needs_unref);
876  return;
877  }
878  msg_put = message;
879  msg = message;
880  } else if (stasis_cache_clear_type() == msg_type) {
881  /* Cache clear event. */
882  msg_put = NULL;
883  msg = stasis_message_data(message);
884  msg_type = stasis_message_type(msg);
885  } else {
886  /* Normal cache update event. */
887  msg_put = message;
888  msg = message;
889  }
890  ast_assert(msg_type != NULL);
891 
892  msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
893  msg_id = caching_topic->cache->id_fn(msg);
894  if (msg_id && msg_eid) {
895  struct stasis_message *update;
896  struct cache_put_snapshots snapshots;
897 
898  /* Update the cache */
899  snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
900  if (snapshots.old || msg_put) {
901  if (stasis_topic_subscribers(caching_topic->topic)) {
902  update = update_create(snapshots.old, msg_put);
903  if (update) {
904  stasis_publish(caching_topic->topic, update);
905  ao2_ref(update, -1);
906  }
907  }
908  } else {
909  ast_debug(1,
910  "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
911  stasis_topic_name(caching_topic->topic),
912  stasis_message_type_name(msg_type), msg_id);
913  }
914 
915  if (snapshots.aggregate_old != snapshots.aggregate_new) {
916  if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
917  caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
918  snapshots.aggregate_new);
919  }
920  if (stasis_topic_subscribers(caching_topic->topic)) {
921  update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
922  if (update) {
923  stasis_publish(caching_topic->topic, update);
924  ao2_ref(update, -1);
925  }
926  }
927  }
928 
929  ao2_cleanup(snapshots.old);
930  ao2_cleanup(snapshots.aggregate_old);
931  ao2_cleanup(snapshots.aggregate_new);
932  }
933 
934  ao2_cleanup(caching_topic_needs_unref);
935 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
struct stasis_cache * cache
Definition: stasis_cache.c:56
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define ao2_wrlock(a)
Definition: astobj2.h:720
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
struct ao2_container * entries
Definition: stasis_cache.c:47
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
static struct stasis_message * cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
Definition: stasis_cache.c:425
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
static struct cache_put_snapshots cache_put(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:505
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_topic * topic
Definition: stasis_cache.c:57
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
static struct stasis_message * update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:795
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
snapshot_get_id id_fn
Definition: stasis_cache.c:48
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
Definition: stasis_cache.c:173

◆ print_cache_entry()

static void print_cache_entry ( void *  v_obj,
void *  where,
ao2_prnt_fn prnt 
)
static

Definition at line 937 of file stasis_cache.c.

References cache_entry_key::hash, cache_entry_key::id, stasis_cache_entry::key, stasis_message_type_name(), and cache_entry_key::type.

Referenced by stasis_caching_topic_create().

938 {
939  struct stasis_cache_entry *entry = v_obj;
940 
941  if (!entry) {
942  return;
943  }
944  prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
945  entry->key.id, entry->key.hash);
946 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct cache_entry_key key
Definition: stasis_cache.c:174
const char * id
Definition: stasis_cache.c:168
unsigned int hash
Definition: stasis_cache.c:170
Definition: search.h:40
struct stasis_message_type * type
Definition: stasis_cache.c:166
Definition: stasis_cache.c:173

◆ stasis_cache_cleanup()

static void stasis_cache_cleanup ( void  )
static

Definition at line 1002 of file stasis_cache.c.

References stasis_cache_clear_type(), stasis_cache_update_type(), and STASIS_MESSAGE_TYPE_CLEANUP.

Referenced by stasis_cache_init().

1003 {
1006 }
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.

◆ stasis_cache_clear_create()

struct stasis_message* stasis_cache_clear_create ( struct stasis_message message)

A message which instructs the caching topic to remove an entry from its cache.

Parameters
messageMessage representative of the cache entry that should be cleared. This will become the data held in the stasis_cache_clear message.
Returns
Message which, when sent to a stasis_caching_topic, will clear the item from the cache.
NULL on error.
Since
12

Definition at line 778 of file stasis_cache.c.

References stasis_cache_clear_type(), and stasis_message_create().

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), clear_node_cache(), and remove_device_states_cb().

779 {
780  return stasis_message_create(stasis_cache_clear_type(), id_message);
781 }
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.

◆ stasis_cache_create()

struct stasis_cache* stasis_cache_create ( snapshot_get_id  id_fn)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
Return values
Newcache indexed by id_fn.

Definition at line 360 of file stasis_cache.c.

References NULL, and stasis_cache_create_full().

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), mwi_init(), and stasis_cp_all_create().

361 {
362  return stasis_cache_create_full(id_fn, NULL, NULL);
363 }
#define NULL
Definition: resample.c:96
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334

◆ stasis_cache_create_full()

struct stasis_cache* stasis_cache_create_full ( snapshot_get_id  id_fn,
cache_aggregate_calc_fn  aggregate_calc_fn,
cache_aggregate_publish_fn  aggregate_publish_fn 
)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
aggregate_calc_fnCallback to calculate the aggregate cache entry.
aggregate_publish_fnCallback to publish the aggregate cache entry.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Newcache indexed by id_fn.

Definition at line 334 of file stasis_cache.c.

References stasis_cache::aggregate_calc_fn, stasis_cache::aggregate_publish_fn, AO2_ALLOC_OPT_LOCK_NOLOCK, AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_alloc_options, ao2_cleanup, ao2_container_alloc_hash, cache, cache_dtor(), cache_entry_cmp(), cache_entry_hash(), stasis_cache::entries, stasis_cache::id_fn, NULL, and NUM_CACHE_BUCKETS.

Referenced by AST_TEST_DEFINE(), devstate_init(), and stasis_cache_create().

337 {
338  struct stasis_cache *cache;
339 
340  cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
342  if (!cache) {
343  return NULL;
344  }
345 
348  if (!cache->entries) {
349  ao2_cleanup(cache);
350  return NULL;
351  }
352 
353  cache->id_fn = id_fn;
356 
357  return cache;
358 }
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49
static int cache_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis_cache.c:288
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
struct ao2_container * cache
Definition: pbx_realtime.c:77
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
snapshot_get_id id_fn
Definition: stasis_cache.c:48
#define NUM_CACHE_BUCKETS
Definition: stasis_cache.c:42
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
static int cache_entry_hash(const void *obj, int flags)
Definition: stasis_cache.c:266
static void cache_dtor(void *obj)
Definition: stasis_cache.c:326

◆ stasis_cache_dump()

struct ao2_container* stasis_cache_dump ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump cached items to a subscription for the ast_eid_default entity.

Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Return values
ao2_containercontaining all matches (must be unreffed by caller)

Definition at line 736 of file stasis_cache.c.

References ast_eid_default, and stasis_cache_dump_by_eid().

Referenced by action_presencestatelist(), ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), AST_TEST_DEFINE(), asterisk_publication_devicestate_refresh(), asterisk_publication_mwi_refresh(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), endpoints_scrape_cb(), load_module(), unload_module(), and xmpp_init_event_distribution().

737 {
738  return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
739 }
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93

◆ stasis_cache_dump_all()

struct ao2_container* stasis_cache_dump_all ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump all entity items from the cache to a subscription.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Return values
ao2_containercontaining all matches (must be unreffed by caller)

Definition at line 757 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache_dump_all_cb(), cache_dump_data::container, cache_dump_data::eid, stasis_cache::entries, NULL, OBJ_MULTIPLE, OBJ_NODATA, stasis_cache_clear_type(), stasis_cache_update_type(), STASIS_MESSAGE_TYPE_DEFN(), type, and cache_dump_data::type.

Referenced by AST_TEST_DEFINE(), cache_cleanup(), and cleanup_module().

758 {
759  struct cache_dump_data cache_dump;
760 
761  ast_assert(cache != NULL);
762  ast_assert(cache->entries != NULL);
763 
764  cache_dump.eid = NULL;
765  cache_dump.type = type;
766  cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
767  if (!cache_dump.container) {
768  return NULL;
769  }
770 
772  return cache_dump.container;
773 }
static const char type[]
Definition: chan_ooh323.c:109
static int cache_dump_all_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:741
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47

◆ stasis_cache_dump_by_eid()

struct ao2_container* stasis_cache_dump_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const struct ast_eid eid 
)

Dump cached items to a subscription for a specific entity.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
eidSpecific entity id to retrieve. NULL for aggregate.
Return values
ao2_containercontaining all matches (must be unreffed by caller)

Definition at line 718 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, ast_assert, cache_dump_by_eid_cb(), cache_dump_data::container, cache_dump_data::eid, stasis_cache::entries, NULL, OBJ_MULTIPLE, OBJ_NODATA, type, and cache_dump_data::type.

Referenced by action_devicestatelist(), AST_TEST_DEFINE(), cpg_confchg_cb(), and stasis_cache_dump().

719 {
720  struct cache_dump_data cache_dump;
721 
722  ast_assert(cache != NULL);
723  ast_assert(cache->entries != NULL);
724 
725  cache_dump.eid = eid;
726  cache_dump.type = type;
727  cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
728  if (!cache_dump.container) {
729  return NULL;
730  }
731 
733  return cache_dump.container;
734 }
static const char type[]
Definition: chan_ooh323.c:109
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
const struct ast_eid * eid
Definition: stasis_cache.c:694
static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:697

◆ stasis_cache_entry_get_aggregate()

struct stasis_message* stasis_cache_entry_get_aggregate ( struct stasis_cache_entry entry)

Get the aggregate cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the aggregate snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Aggregate-snapshotin cache.
NULLif not present.

Definition at line 365 of file stasis_cache.c.

References stasis_cache_entry::aggregate.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

366 {
367  return entry->aggregate;
368 }
struct stasis_message * aggregate
Definition: stasis_cache.c:176

◆ stasis_cache_entry_get_local()

struct stasis_message* stasis_cache_entry_get_local ( struct stasis_cache_entry entry)

Get the local entity's cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the local entity's snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Internal-snapshotin cache.
NULLif not present.

Definition at line 370 of file stasis_cache.c.

References stasis_cache_entry::local.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

371 {
372  return entry->local;
373 }
struct stasis_message * local
Definition: stasis_cache.c:178

◆ stasis_cache_entry_get_remote()

struct stasis_message* stasis_cache_entry_get_remote ( struct stasis_cache_entry entry,
int  idx 
)

Get a remote entity's cache entry snapshot by index.

Since
12.2.0
Parameters
entryCache entry to get a remote entity's snapshot.
idxWhich remote entity's snapshot to get.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Remote-entity-snapshotin cache.
NULLif not present.

Definition at line 375 of file stasis_cache.c.

References AST_VECTOR_GET, AST_VECTOR_SIZE, and NULL.

Referenced by cache_test_aggregate_calc_fn(), and device_state_aggregate_calc().

376 {
377  if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378  return AST_VECTOR_GET(&entry->remote, idx);
379  }
380  return NULL;
381 }
#define NULL
Definition: resample.c:96
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_cache_get()

struct stasis_message* stasis_cache_get ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve an item from the cache for the ast_eid_default entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Return values
Messagefrom the cache.

Definition at line 686 of file stasis_cache.c.

References ast_eid_default, and stasis_cache_get_by_eid().

Referenced by ast_endpoint_latest_snapshot(), AST_TEST_DEFINE(), get_cached_mwi(), has_voicemail(), presence_state_cached(), unistim_send_mwi_to_peer(), and update_registry().

687 {
688  return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
689 }
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93

◆ stasis_cache_get_all()

struct ao2_container* stasis_cache_get_all ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve all matching entity items from the cache.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Return values
Containerof matching items found.

Definition at line 587 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_cleanup, ao2_container_alloc_list, ao2_rdlock, ao2_unlock, ast_assert, cache_entry_dump(), cache_find(), stasis_cache::entries, and NULL.

Referenced by AST_TEST_DEFINE().

588 {
589  struct stasis_cache_entry *cached_entry;
590  struct ao2_container *found;
591 
592  ast_assert(cache != NULL);
593  ast_assert(cache->entries != NULL);
594  ast_assert(id != NULL);
595 
596  if (!type) {
597  return NULL;
598  }
599 
601  if (!found) {
602  return NULL;
603  }
604 
605  ao2_rdlock(cache->entries);
606 
607  cached_entry = cache_find(cache->entries, type, id);
608  if (cached_entry && cache_entry_dump(found, cached_entry)) {
609  ao2_cleanup(found);
610  found = NULL;
611  }
612 
613  ao2_unlock(cache->entries);
614 
615  ao2_cleanup(cached_entry);
616  return found;
617 }
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
#define ao2_rdlock(a)
Definition: astobj2.h:719
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Generic container type.
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
Definition: stasis_cache.c:563
Definition: stasis_cache.c:173

◆ stasis_cache_get_by_eid()

struct stasis_message* stasis_cache_get_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id,
const struct ast_eid eid 
)

Retrieve an item from the cache for a specific entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
eidSpecific entity id to retrieve. NULL for aggregate.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Messagefrom the cache.

Definition at line 659 of file stasis_cache.c.

References ao2_bump, ao2_cleanup, ao2_rdlock, ao2_unlock, ast_assert, cache_entry_by_eid(), cache_find(), stasis_cache::entries, and NULL.

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), AST_TEST_DEFINE(), check_cache_aggregate(), devstate_cached(), and stasis_cache_get().

660 {
661  struct stasis_cache_entry *cached_entry;
662  struct stasis_message *snapshot = NULL;
663 
664  ast_assert(cache != NULL);
665  ast_assert(cache->entries != NULL);
666  ast_assert(id != NULL);
667 
668  if (!type) {
669  return NULL;
670  }
671 
672  ao2_rdlock(cache->entries);
673 
674  cached_entry = cache_find(cache->entries, type, id);
675  if (cached_entry) {
676  snapshot = cache_entry_by_eid(cached_entry, eid);
677  ao2_bump(snapshot);
678  }
679 
680  ao2_unlock(cache->entries);
681 
682  ao2_cleanup(cached_entry);
683  return snapshot;
684 }
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
struct ao2_container * entries
Definition: stasis_cache.c:47
#define ao2_bump(obj)
Definition: astobj2.h:491
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
#define ao2_rdlock(a)
Definition: astobj2.h:719
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
Definition: stasis_cache.c:631
Definition: stasis_cache.c:173

◆ stasis_cache_init()

int stasis_cache_init ( void  )

Definition at line 1008 of file stasis_cache.c.

References ast_register_cleanup(), stasis_cache_cleanup(), stasis_cache_clear_type(), stasis_cache_update_type(), and STASIS_MESSAGE_TYPE_INIT.

Referenced by stasis_init().

1009 {
1011 
1013  return -1;
1014  }
1015 
1017  return -1;
1018  }
1019 
1020  return 0;
1021 }
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
static void stasis_cache_cleanup(void)

◆ stasis_cache_update_dtor()

static void stasis_cache_update_dtor ( void *  obj)
static

Definition at line 783 of file stasis_cache.c.

References ao2_cleanup, stasis_cache_update::new_snapshot, NULL, stasis_cache_update::old_snapshot, stasis_cache_update::type, and update().

Referenced by update_create().

784 {
785  struct stasis_cache_update *update = obj;
786 
787  ao2_cleanup(update->old_snapshot);
788  update->old_snapshot = NULL;
789  ao2_cleanup(update->new_snapshot);
790  update->new_snapshot = NULL;
791  ao2_cleanup(update->type);
792  update->type = NULL;
793 }
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:971
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
#define NULL
Definition: resample.c:96
Cache update message.
Definition: stasis.h:967
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:973
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_caching_accept_message_type()

int stasis_caching_accept_message_type ( struct stasis_caching_topic caching_topic,
struct stasis_message_type type 
)

Indicate to a caching topic that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
caching_topicThe caching topic.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 90 of file stasis_cache.c.

References stasis_cache_clear_type(), stasis_subscription_accept_message_type(), stasis_subscription_change_type(), and stasis_caching_topic::sub.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_accept_message_type().

92 {
93  int res;
94 
95  if (!caching_topic) {
96  return -1;
97  }
98 
99  /* We wait to accept the stasis specific message types until now so that by default everything
100  * will flow to us.
101  */
104  res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105 
106  return res;
107 }
struct stasis_subscription * sub
Definition: stasis_cache.c:59
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.

◆ stasis_caching_get_topic()

struct stasis_topic* stasis_caching_get_topic ( struct stasis_caching_topic caching_topic)

Returns the topic of cached events from a caching topics.

Parameters
caching_topicThe caching topic.
Returns
The topic that publishes cache update events, along with passthrough events from the underlying topic.
NULL if caching_topic is NULL.
Since
12

Definition at line 85 of file stasis_cache.c.

References stasis_caching_topic::topic.

Referenced by ast_device_state_topic_cached(), ast_mwi_topic_cached(), ast_presence_state_topic_cached(), AST_TEST_DEFINE(), stasis_cp_single_create(), and stasis_cp_single_topic_cached().

86 {
87  return caching_topic->topic;
88 }
struct stasis_topic * topic
Definition: stasis_cache.c:57

◆ stasis_caching_set_filter()

int stasis_caching_set_filter ( struct stasis_caching_topic caching_topic,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
caching_topicThe caching topic.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 109 of file stasis_cache.c.

References stasis_subscription_set_filter(), and stasis_caching_topic::sub.

Referenced by ast_presence_state_engine_init(), devstate_init(), and stasis_cp_single_set_filter().

111 {
112  if (!caching_topic) {
113  return -1;
114  }
115  return stasis_subscription_set_filter(caching_topic->sub, filter);
116 }
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
struct stasis_subscription * sub
Definition: stasis_cache.c:59
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709

◆ stasis_caching_topic_create()

struct stasis_caching_topic* stasis_caching_topic_create ( struct stasis_topic original_topic,
struct stasis_cache cache 
)

Create a topic which monitors and caches messages from another topic.

The idea is that some topics publish 'snapshots' of some other object's state that should be cached. When these snapshot messages are received, the cache is updated, and a stasis_cache_update() message is forwarded, which has both the original snapshot message and the new message.

The returned object is AO2 managed, so ao2_cleanup() when done with it.

Parameters
original_topicTopic publishing snapshot messages.
cacheBackend cache in which to keep snapshots.
Returns
New topic which changes snapshot messages to stasis_cache_update() messages, and forwards all other messages from the original topic.
NULL on error
Since
12

Definition at line 948 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_container_register(), ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), ast_free, ast_log, stasis_caching_topic::cache, cache, caching_topic_exec(), stasis_cache::entries, internal_stasis_subscribe(), LOG_ERROR, NULL, stasis_caching_topic::original_topic, print_cache_entry(), stasis_cache::registered, stasis_caching_topic_dtor(), stasis_topic_create(), stasis_topic_name(), stasis_caching_topic::sub, and stasis_caching_topic::topic.

Referenced by ast_presence_state_engine_init(), AST_TEST_DEFINE(), devstate_init(), mwi_init(), and stasis_cp_sink_create().

949 {
950  struct stasis_caching_topic *caching_topic;
951  static int caching_id;
952  char *new_name;
953  int ret;
954 
955  ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
956  if (ret < 0) {
957  return NULL;
958  }
959 
960  caching_topic = ao2_alloc_options(sizeof(*caching_topic),
962  if (caching_topic == NULL) {
963  ast_free(new_name);
964 
965  return NULL;
966  }
967 
968  caching_topic->topic = stasis_topic_create(new_name);
969  if (caching_topic->topic == NULL) {
970  ao2_ref(caching_topic, -1);
971  ast_free(new_name);
972 
973  return NULL;
974  }
975 
976  ao2_ref(cache, +1);
977  caching_topic->cache = cache;
978  if (!cache->registered) {
979  if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
980  ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
981  cache->entries, new_name);
982  } else {
983  cache->registered = 1;
984  }
985  }
986  ast_free(new_name);
987 
988  caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989  if (caching_topic->sub == NULL) {
990  ao2_ref(caching_topic, -1);
991 
992  return NULL;
993  }
994 
995  ao2_ref(original_topic, +1);
996  caching_topic->original_topic = original_topic;
997 
998  /* The subscription holds the reference, so no additional ref bump. */
999  return caching_topic;
1000 }
struct stasis_cache * cache
Definition: stasis_cache.c:56
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define NULL
Definition: resample.c:96
static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: stasis_cache.c:833
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
struct stasis_subscription * sub
Definition: stasis_cache.c:59
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:858
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
struct ao2_container * entries
Definition: stasis_cache.c:47
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
#define LOG_ERROR
Definition: logger.h:285
struct stasis_topic * topic
Definition: stasis_cache.c:57
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
#define ast_free(a)
Definition: astmm.h:182
struct ao2_container * cache
Definition: pbx_realtime.c:77
static void stasis_caching_topic_dtor(void *obj)
Definition: stasis_cache.c:62
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
Definition: stasis_cache.c:937

◆ stasis_caching_topic_dtor()

static void stasis_caching_topic_dtor ( void *  obj)
static

Definition at line 62 of file stasis_cache.c.

References ao2_cleanup, ao2_container_unregister(), ast_assert, stasis_caching_topic::cache, NULL, stasis_caching_topic::original_topic, stasis_subscription_is_done(), stasis_subscription_is_subscribed(), stasis_topic_name(), stasis_caching_topic::sub, and stasis_caching_topic::topic.

Referenced by stasis_caching_topic_create().

63 {
64  struct stasis_caching_topic *caching_topic = obj;
65 
66  /* Caching topics contain subscriptions, and must be manually
67  * unsubscribed. */
69  /* If there are any messages in flight to this subscription; that would
70  * be bad. */
72 
74 
75  ao2_cleanup(caching_topic->sub);
76  caching_topic->sub = NULL;
77  ao2_cleanup(caching_topic->cache);
78  caching_topic->cache = NULL;
79  ao2_cleanup(caching_topic->topic);
80  caching_topic->topic = NULL;
81  ao2_cleanup(caching_topic->original_topic);
82  caching_topic->original_topic = NULL;
83 }
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
struct stasis_cache * cache
Definition: stasis_cache.c:56
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
struct stasis_subscription * sub
Definition: stasis_cache.c:59
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
struct stasis_topic * topic
Definition: stasis_cache.c:57
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_caching_unsubscribe()

struct stasis_caching_topic* stasis_caching_unsubscribe ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic.

This function returns immediately, so be sure to cleanup when stasis_subscription_final_message() is received.

Parameters
caching_topicCaching topic to unsubscribe
Returns
NULL for convenience
Since
12

Definition at line 119 of file stasis_cache.c.

References ao2_cleanup, ao2_ref, ast_log, LOG_ERROR, NULL, stasis_subscription_is_subscribed(), stasis_unsubscribe(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), stasis_caching_unsubscribe_and_join(), and stasis_cp_single_unsubscribe().

120 {
121  if (!caching_topic) {
122  return NULL;
123  }
124 
125  /*
126  * The subscription may hold the last reference to this caching
127  * topic, but we want to make sure the unsubscribe finishes
128  * before kicking of the caching topic's dtor.
129  */
130  ao2_ref(caching_topic, +1);
131 
132  if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133  /*
134  * Increment the reference to hold on to it past the
135  * unsubscribe. Will be cleaned up in dtor.
136  */
137  ao2_ref(caching_topic->sub, +1);
138  stasis_unsubscribe(caching_topic->sub);
139  } else {
140  ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141  }
142  ao2_cleanup(caching_topic);
143  return NULL;
144 }
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
#define NULL
Definition: resample.c:96
struct stasis_subscription * sub
Definition: stasis_cache.c:59
#define ast_log
Definition: astobj2.c:42
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define LOG_ERROR
Definition: logger.h:285
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
#define ao2_cleanup(obj)
Definition: astobj2.h:1958

◆ stasis_caching_unsubscribe_and_join()

struct stasis_caching_topic* stasis_caching_unsubscribe_and_join ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded.

See stasis_unsubscriben_and_join() for more info on when to use this as opposed to stasis_caching_unsubscribe().

Parameters
caching_topicCaching topic to unsubscribe
Returns
NULL for convenience
Since
12

Definition at line 146 of file stasis_cache.c.

References ao2_cleanup, ao2_ref, NULL, stasis_caching_unsubscribe(), stasis_subscription_join(), and stasis_caching_topic::sub.

Referenced by AST_TEST_DEFINE(), devstate_cleanup(), mwi_cleanup(), and presence_state_engine_cleanup().

147 {
148  if (!caching_topic) {
149  return NULL;
150  }
151 
152  /* Hold a ref past the unsubscribe */
153  ao2_ref(caching_topic, +1);
154  stasis_caching_unsubscribe(caching_topic);
155  stasis_subscription_join(caching_topic->sub);
156  ao2_cleanup(caching_topic);
157  return NULL;
158 }
#define NULL
Definition: resample.c:96
struct stasis_subscription * sub
Definition: stasis_cache.c:59
#define ao2_ref(o, delta)
Definition: astobj2.h:464
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1107
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119

◆ STASIS_MESSAGE_TYPE_DEFN() [1/2]

STASIS_MESSAGE_TYPE_DEFN ( stasis_cache_clear_type  )

Referenced by stasis_cache_dump_all().

◆ STASIS_MESSAGE_TYPE_DEFN() [2/2]

STASIS_MESSAGE_TYPE_DEFN ( stasis_cache_update_type  )

◆ update_create()

static struct stasis_message* update_create ( struct stasis_message old_snapshot,
struct stasis_message new_snapshot 
)
static

Definition at line 795 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_cleanup, ao2_ref, ast_assert, stasis_cache_update::new_snapshot, NULL, stasis_cache_update::old_snapshot, stasis_cache_update_dtor(), stasis_cache_update_type(), stasis_message_create(), stasis_message_type(), stasis_cache_update::type, and update().

Referenced by caching_topic_exec().

796 {
797  struct stasis_cache_update *update;
798  struct stasis_message *msg;
799 
800  ast_assert(old_snapshot != NULL || new_snapshot != NULL);
801 
802  if (!stasis_cache_update_type()) {
803  return NULL;
804  }
805 
806  update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
808  if (!update) {
809  return NULL;
810  }
811 
812  if (old_snapshot) {
813  ao2_ref(old_snapshot, +1);
814  update->old_snapshot = old_snapshot;
815  if (!new_snapshot) {
816  ao2_ref(stasis_message_type(old_snapshot), +1);
817  update->type = stasis_message_type(old_snapshot);
818  }
819  }
820  if (new_snapshot) {
821  ao2_ref(new_snapshot, +1);
822  update->new_snapshot = new_snapshot;
823  ao2_ref(stasis_message_type(new_snapshot), +1);
824  update->type = stasis_message_type(new_snapshot);
825  }
826 
828 
829  ao2_cleanup(update);
830  return msg;
831 }
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:971
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
static void stasis_cache_update_dtor(void *obj)
Definition: stasis_cache.c:783
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
Cache update message.
Definition: stasis.h:967
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:973
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
#define ao2_cleanup(obj)
Definition: astobj2.h:1958