Asterisk - The Open Source Telephony Project  18.5.0
stasis_cache.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <[email protected]>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
33 #include "asterisk/hashtab.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/utils.h"
37 #include "asterisk/vector.h"
38 
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44 
45 /*! \internal */
46 struct stasis_cache {
52 };
53 
54 /*! \internal */
60 };
61 
62 static void stasis_caching_topic_dtor(void *obj)
63 {
64  struct stasis_caching_topic *caching_topic = obj;
65 
66  /* Caching topics contain subscriptions, and must be manually
67  * unsubscribed. */
69  /* If there are any messages in flight to this subscription; that would
70  * be bad. */
72 
74 
75  ao2_cleanup(caching_topic->sub);
76  caching_topic->sub = NULL;
77  ao2_cleanup(caching_topic->cache);
78  caching_topic->cache = NULL;
79  ao2_cleanup(caching_topic->topic);
80  caching_topic->topic = NULL;
81  ao2_cleanup(caching_topic->original_topic);
82  caching_topic->original_topic = NULL;
83 }
84 
86 {
87  return caching_topic->topic;
88 }
89 
91  struct stasis_message_type *type)
92 {
93  int res;
94 
95  if (!caching_topic) {
96  return -1;
97  }
98 
99  /* We wait to accept the stasis specific message types until now so that by default everything
100  * will flow to us.
101  */
104  res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105 
106  return res;
107 }
108 
111 {
112  if (!caching_topic) {
113  return -1;
114  }
115  return stasis_subscription_set_filter(caching_topic->sub, filter);
116 }
117 
118 
120 {
121  if (!caching_topic) {
122  return NULL;
123  }
124 
125  /*
126  * The subscription may hold the last reference to this caching
127  * topic, but we want to make sure the unsubscribe finishes
128  * before kicking of the caching topic's dtor.
129  */
130  ao2_ref(caching_topic, +1);
131 
132  if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133  /*
134  * Increment the reference to hold on to it past the
135  * unsubscribe. Will be cleaned up in dtor.
136  */
137  ao2_ref(caching_topic->sub, +1);
138  stasis_unsubscribe(caching_topic->sub);
139  } else {
140  ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141  }
142  ao2_cleanup(caching_topic);
143  return NULL;
144 }
145 
147 {
148  if (!caching_topic) {
149  return NULL;
150  }
151 
152  /* Hold a ref past the unsubscribe */
153  ao2_ref(caching_topic, +1);
154  stasis_caching_unsubscribe(caching_topic);
155  stasis_subscription_join(caching_topic->sub);
156  ao2_cleanup(caching_topic);
157  return NULL;
158 }
159 
160 /*!
161  * \brief The key for an entry in the cache
162  * \note The items in this struct must be immutable for the item in the cache
163  */
165  /*! The message type of the item stored in the cache */
167  /*! The unique ID of the item stored in the cache */
168  const char *id;
169  /*! The hash, computed from \c type and \c id */
170  unsigned int hash;
171 };
172 
174  struct cache_entry_key key;
175  /*! Aggregate snapshot of the stasis cache. */
177  /*! Local entity snapshot of the stasis event. */
179  /*! Remote entity snapshots of the stasis event. */
180  AST_VECTOR(, struct stasis_message *) remote;
181 };
182 
183 static void cache_entry_dtor(void *obj)
184 {
185  struct stasis_cache_entry *entry = obj;
186  size_t idx;
187 
188  entry->key.type = NULL;
189  ast_free((char *) entry->key.id);
190  entry->key.id = NULL;
191 
192  ao2_cleanup(entry->aggregate);
193  entry->aggregate = NULL;
194  ao2_cleanup(entry->local);
195  entry->local = NULL;
196 
197  for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
198  struct stasis_message *remote;
199 
200  remote = AST_VECTOR_GET(&entry->remote, idx);
201  ao2_cleanup(remote);
202  }
203  AST_VECTOR_FREE(&entry->remote);
204 }
205 
207 {
208  key->hash = stasis_message_type_hash(key->type);
209  key->hash += ast_hashtab_hash_string(key->id);
210 }
211 
212 static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
213 {
214  struct stasis_cache_entry *entry;
215  int is_remote;
216 
217  ast_assert(id != NULL);
218  ast_assert(snapshot != NULL);
219 
220  if (!type) {
221  return NULL;
222  }
223 
224  entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
226  if (!entry) {
227  return NULL;
228  }
229 
230  entry->key.id = ast_strdup(id);
231  if (!entry->key.id) {
232  ao2_cleanup(entry);
233  return NULL;
234  }
235  /*
236  * Normal ao2 ref counting rules says we should increment the message
237  * type ref here and decrement it in cache_entry_dtor(). However, the
238  * stasis message snapshot is cached here, will always have the same type
239  * as the cache entry, and can legitimately cause the type ref count to
240  * hit the excessive ref count assertion. Since the cache entry will
241  * always have a snapshot we can get away with not holding a ref here.
242  */
243  ast_assert(type == stasis_message_type(snapshot));
244  entry->key.type = type;
245  cache_entry_compute_hash(&entry->key);
246 
247  is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
248  if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
249  ao2_cleanup(entry);
250  return NULL;
251  }
252 
253  if (is_remote) {
254  if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
255  ao2_cleanup(entry);
256  return NULL;
257  }
258  } else {
259  entry->local = snapshot;
260  }
261  ao2_bump(snapshot);
262 
263  return entry;
264 }
265 
266 static int cache_entry_hash(const void *obj, int flags)
267 {
268  const struct stasis_cache_entry *object;
269  const struct cache_entry_key *key;
270 
271  switch (flags & OBJ_SEARCH_MASK) {
272  case OBJ_SEARCH_KEY:
273  key = obj;
274  break;
275  case OBJ_SEARCH_OBJECT:
276  object = obj;
277  key = &object->key;
278  break;
279  default:
280  /* Hash can only work on something with a full key. */
281  ast_assert(0);
282  return 0;
283  }
284 
285  return (int)key->hash;
286 }
287 
288 static int cache_entry_cmp(void *obj, void *arg, int flags)
289 {
290  const struct stasis_cache_entry *object_left = obj;
291  const struct stasis_cache_entry *object_right = arg;
292  const struct cache_entry_key *right_key = arg;
293  int cmp;
294 
295  switch (flags & OBJ_SEARCH_MASK) {
296  case OBJ_SEARCH_OBJECT:
297  right_key = &object_right->key;
298  /* Fall through */
299  case OBJ_SEARCH_KEY:
300  cmp = object_left->key.type != right_key->type
301  || strcmp(object_left->key.id, right_key->id);
302  break;
304  /* Not supported by container */
305  ast_assert(0);
306  cmp = -1;
307  break;
308  default:
309  /*
310  * What arg points to is specific to this traversal callback
311  * and has no special meaning to astobj2.
312  */
313  cmp = 0;
314  break;
315  }
316  if (cmp) {
317  return 0;
318  }
319  /*
320  * At this point the traversal callback is identical to a sorted
321  * container.
322  */
323  return CMP_MATCH;
324 }
325 
326 static void cache_dtor(void *obj)
327 {
328  struct stasis_cache *cache = obj;
329 
330  ao2_cleanup(cache->entries);
331  cache->entries = NULL;
332 }
333 
337 {
338  struct stasis_cache *cache;
339 
340  cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
342  if (!cache) {
343  return NULL;
344  }
345 
348  if (!cache->entries) {
349  ao2_cleanup(cache);
350  return NULL;
351  }
352 
353  cache->id_fn = id_fn;
356 
357  return cache;
358 }
359 
361 {
362  return stasis_cache_create_full(id_fn, NULL, NULL);
363 }
364 
366 {
367  return entry->aggregate;
368 }
369 
371 {
372  return entry->local;
373 }
374 
376 {
377  if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378  return AST_VECTOR_GET(&entry->remote, idx);
379  }
380  return NULL;
381 }
382 
383 /*!
384  * \internal
385  * \brief Find the cache entry in the cache entries container.
386  *
387  * \param entries Container of cached entries.
388  * \param type Type of message to retrieve the cache entry.
389  * \param id Identity of the snapshot to retrieve the cache entry.
390  *
391  * \note The entries container is already locked.
392  *
393  * \retval Cache-entry on success.
394  * \retval NULL Not in cache.
395  */
396 static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
397 {
398  struct cache_entry_key search_key;
399  struct stasis_cache_entry *entry;
400 
401  search_key.type = type;
402  search_key.id = id;
403  cache_entry_compute_hash(&search_key);
404  entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
405 
406  /* Ensure that what we looked for is what we found. */
407  ast_assert(!entry
408  || (!strcmp(stasis_message_type_name(entry->key.type),
409  stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
410  return entry;
411 }
412 
413 /*!
414  * \internal
415  * \brief Remove the stasis snapshot in the cache entry determined by eid.
416  *
417  * \param entries Container of cached entries.
418  * \param cached_entry The entry to remove the snapshot from.
419  * \param eid Which snapshot in the cached entry.
420  *
421  * \note The entries container is already locked.
422  *
423  * \return Previous stasis entry snapshot.
424  */
425 static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
426 {
427  struct stasis_message *old_snapshot;
428  int is_remote;
429 
430  is_remote = ast_eid_cmp(eid, &ast_eid_default);
431  if (!is_remote) {
432  old_snapshot = cached_entry->local;
433  cached_entry->local = NULL;
434  } else {
435  int idx;
436 
437  old_snapshot = NULL;
438  for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
439  struct stasis_message *cur;
440 
441  cur = AST_VECTOR_GET(&cached_entry->remote, idx);
442  if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
443  old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
444  break;
445  }
446  }
447  }
448 
449  if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
450  ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
451  }
452 
453  return old_snapshot;
454 }
455 
456 /*!
457  * \internal
458  * \brief Update the stasis snapshot in the cache entry determined by eid.
459  *
460  * \param cached_entry The entry to remove the snapshot from.
461  * \param eid Which snapshot in the cached entry.
462  * \param new_snapshot Snapshot to replace the old snapshot.
463  *
464  * \return Previous stasis entry snapshot.
465  */
466 static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
467 {
468  struct stasis_message *old_snapshot;
469  int is_remote;
470  int idx;
471 
472  is_remote = ast_eid_cmp(eid, &ast_eid_default);
473  if (!is_remote) {
474  old_snapshot = cached_entry->local;
475  cached_entry->local = ao2_bump(new_snapshot);
476  return old_snapshot;
477  }
478 
479  old_snapshot = NULL;
480  for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
481  struct stasis_message *cur;
482 
483  cur = AST_VECTOR_GET(&cached_entry->remote, idx);
484  if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
485  old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
486  break;
487  }
488  }
489  if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
490  ao2_bump(new_snapshot);
491  }
492 
493  return old_snapshot;
494 }
495 
497  /*! Old cache eid snapshot. */
499  /*! Old cache aggregate snapshot. */
501  /*! New cache aggregate snapshot. */
503 };
504 
506  struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
507  struct stasis_message *new_snapshot)
508 {
509  struct stasis_cache_entry *cached_entry;
510  struct cache_put_snapshots snapshots;
511 
512  ast_assert(cache->entries != NULL);
513  ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
514  ast_assert(new_snapshot == NULL ||
515  type == stasis_message_type(new_snapshot));
516 
517  memset(&snapshots, 0, sizeof(snapshots));
518 
519  ao2_wrlock(cache->entries);
520 
521  cached_entry = cache_find(cache->entries, type, id);
522 
523  /* Update the eid snapshot. */
524  if (!new_snapshot) {
525  /* Remove snapshot from cache */
526  if (cached_entry) {
527  snapshots.old = cache_remove(cache->entries, cached_entry, eid);
528  }
529  } else if (cached_entry) {
530  /* Update snapshot in cache */
531  snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
532  } else {
533  /* Insert into the cache */
534  cached_entry = cache_entry_create(type, id, new_snapshot);
535  if (cached_entry) {
536  ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
537  }
538  }
539 
540  /* Update the aggregate snapshot. */
541  if (cache->aggregate_calc_fn && cached_entry) {
542  snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
543  snapshots.aggregate_old = cached_entry->aggregate;
544  cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
545  }
546 
547  ao2_unlock(cache->entries);
548 
549  ao2_cleanup(cached_entry);
550  return snapshots;
551 }
552 
553 /*!
554  * \internal
555  * \brief Dump all entity snapshots in the cache entry into the given container.
556  *
557  * \param snapshots Container to put all snapshots in the cache entry.
558  * \param entry Cache entry to use.
559  *
560  * \retval 0 on success.
561  * \retval non-zero on error.
562  */
563 static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
564 {
565  int idx;
566  int err = 0;
567 
568  ast_assert(snapshots != NULL);
569  ast_assert(entry != NULL);
570 
571  /* The aggregate snapshot is not a snapshot from an entity. */
572 
573  if (entry->local) {
574  err |= !ao2_link(snapshots, entry->local);
575  }
576 
577  for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
578  struct stasis_message *snapshot;
579 
580  snapshot = AST_VECTOR_GET(&entry->remote, idx);
581  err |= !ao2_link(snapshots, snapshot);
582  }
583 
584  return err;
585 }
586 
587 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
588 {
589  struct stasis_cache_entry *cached_entry;
590  struct ao2_container *found;
591 
592  ast_assert(cache != NULL);
593  ast_assert(cache->entries != NULL);
594  ast_assert(id != NULL);
595 
596  if (!type) {
597  return NULL;
598  }
599 
601  if (!found) {
602  return NULL;
603  }
604 
605  ao2_rdlock(cache->entries);
606 
607  cached_entry = cache_find(cache->entries, type, id);
608  if (cached_entry && cache_entry_dump(found, cached_entry)) {
609  ao2_cleanup(found);
610  found = NULL;
611  }
612 
613  ao2_unlock(cache->entries);
614 
615  ao2_cleanup(cached_entry);
616  return found;
617 }
618 
619 /*!
620  * \internal
621  * \brief Retrieve an item from the cache entry for a specific eid.
622  *
623  * \param entry Cache entry to use.
624  * \param eid Specific entity id to retrieve. NULL for aggregate.
625  *
626  * \note The returned snapshot has not had its reference bumped.
627  *
628  * \retval Snapshot from the cache.
629  * \retval \c NULL if snapshot is not found.
630  */
631 static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
632 {
633  int is_remote;
634  int idx;
635 
636  if (!eid) {
637  /* Get aggregate. */
638  return entry->aggregate;
639  }
640 
641  /* Get snapshot with specific eid. */
642  is_remote = ast_eid_cmp(eid, &ast_eid_default);
643  if (!is_remote) {
644  return entry->local;
645  }
646 
647  for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
648  struct stasis_message *cur;
649 
650  cur = AST_VECTOR_GET(&entry->remote, idx);
651  if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
652  return cur;
653  }
654  }
655 
656  return NULL;
657 }
658 
659 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
660 {
661  struct stasis_cache_entry *cached_entry;
662  struct stasis_message *snapshot = NULL;
663 
664  ast_assert(cache != NULL);
665  ast_assert(cache->entries != NULL);
666  ast_assert(id != NULL);
667 
668  if (!type) {
669  return NULL;
670  }
671 
672  ao2_rdlock(cache->entries);
673 
674  cached_entry = cache_find(cache->entries, type, id);
675  if (cached_entry) {
676  snapshot = cache_entry_by_eid(cached_entry, eid);
677  ao2_bump(snapshot);
678  }
679 
680  ao2_unlock(cache->entries);
681 
682  ao2_cleanup(cached_entry);
683  return snapshot;
684 }
685 
686 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
687 {
688  return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
689 }
690 
694  const struct ast_eid *eid;
695 };
696 
697 static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
698 {
699  struct cache_dump_data *cache_dump = arg;
700  struct stasis_cache_entry *entry = obj;
701 
702  if (!cache_dump->type || entry->key.type == cache_dump->type) {
703  struct stasis_message *snapshot;
704 
705  snapshot = cache_entry_by_eid(entry, cache_dump->eid);
706  if (snapshot) {
707  if (!ao2_link(cache_dump->container, snapshot)) {
708  ao2_cleanup(cache_dump->container);
709  cache_dump->container = NULL;
710  return CMP_STOP;
711  }
712  }
713  }
714 
715  return 0;
716 }
717 
718 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
719 {
720  struct cache_dump_data cache_dump;
721 
722  ast_assert(cache != NULL);
723  ast_assert(cache->entries != NULL);
724 
725  cache_dump.eid = eid;
726  cache_dump.type = type;
728  if (!cache_dump.container) {
729  return NULL;
730  }
731 
733  return cache_dump.container;
734 }
735 
737 {
738  return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
739 }
740 
741 static int cache_dump_all_cb(void *obj, void *arg, int flags)
742 {
743  struct cache_dump_data *cache_dump = arg;
744  struct stasis_cache_entry *entry = obj;
745 
746  if (!cache_dump->type || entry->key.type == cache_dump->type) {
747  if (cache_entry_dump(cache_dump->container, entry)) {
748  ao2_cleanup(cache_dump->container);
749  cache_dump->container = NULL;
750  return CMP_STOP;
751  }
752  }
753 
754  return 0;
755 }
756 
758 {
759  struct cache_dump_data cache_dump;
760 
761  ast_assert(cache != NULL);
762  ast_assert(cache->entries != NULL);
763 
764  cache_dump.eid = NULL;
765  cache_dump.type = type;
767  if (!cache_dump.container) {
768  return NULL;
769  }
770 
772  return cache_dump.container;
773 }
774 
777 
779 {
780  return stasis_message_create(stasis_cache_clear_type(), id_message);
781 }
782 
783 static void stasis_cache_update_dtor(void *obj)
784 {
785  struct stasis_cache_update *update = obj;
786 
787  ao2_cleanup(update->old_snapshot);
788  update->old_snapshot = NULL;
789  ao2_cleanup(update->new_snapshot);
790  update->new_snapshot = NULL;
791  ao2_cleanup(update->type);
792  update->type = NULL;
793 }
794 
795 static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
796 {
797  struct stasis_cache_update *update;
798  struct stasis_message *msg;
799 
800  ast_assert(old_snapshot != NULL || new_snapshot != NULL);
801 
802  if (!stasis_cache_update_type()) {
803  return NULL;
804  }
805 
806  update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
808  if (!update) {
809  return NULL;
810  }
811 
812  if (old_snapshot) {
813  ao2_ref(old_snapshot, +1);
814  update->old_snapshot = old_snapshot;
815  if (!new_snapshot) {
816  ao2_ref(stasis_message_type(old_snapshot), +1);
817  update->type = stasis_message_type(old_snapshot);
818  }
819  }
820  if (new_snapshot) {
821  ao2_ref(new_snapshot, +1);
822  update->new_snapshot = new_snapshot;
823  ao2_ref(stasis_message_type(new_snapshot), +1);
824  update->type = stasis_message_type(new_snapshot);
825  }
826 
828 
829  ao2_cleanup(update);
830  return msg;
831 }
832 
833 static void caching_topic_exec(void *data, struct stasis_subscription *sub,
834  struct stasis_message *message)
835 {
836  struct stasis_caching_topic *caching_topic_needs_unref;
837  struct stasis_caching_topic *caching_topic = data;
838  struct stasis_message *msg;
839  struct stasis_message *msg_put;
840  struct stasis_message_type *msg_type;
841  const struct ast_eid *msg_eid;
842  const char *msg_id;
843 
844  ast_assert(caching_topic != NULL);
845  ast_assert(caching_topic->topic != NULL);
846  ast_assert(caching_topic->cache != NULL);
847  ast_assert(caching_topic->cache->id_fn != NULL);
848 
849  if (stasis_subscription_final_message(sub, message)) {
850  caching_topic_needs_unref = caching_topic;
851  } else {
852  caching_topic_needs_unref = NULL;
853  }
854 
855  msg_type = stasis_message_type(message);
856 
857  if (stasis_subscription_change_type() == msg_type) {
858  struct stasis_subscription_change *change = stasis_message_data(message);
859 
860  /*
861  * If this change type is an unsubscribe, we need to find the original
862  * subscribe and remove it from the cache otherwise the cache will
863  * continue to grow unabated.
864  */
865  if (strcmp(change->description, "Unsubscribe") == 0) {
866  struct stasis_cache_entry *cached_sub;
867 
868  ao2_wrlock(caching_topic->cache->entries);
869  cached_sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
870  if (cached_sub) {
871  ao2_cleanup(cache_remove(caching_topic->cache->entries, cached_sub, stasis_message_eid(message)));
872  ao2_cleanup(cached_sub);
873  }
874  ao2_unlock(caching_topic->cache->entries);
875  ao2_cleanup(caching_topic_needs_unref);
876  return;
877  }
878  msg_put = message;
879  msg = message;
880  } else if (stasis_cache_clear_type() == msg_type) {
881  /* Cache clear event. */
882  msg_put = NULL;
883  msg = stasis_message_data(message);
884  msg_type = stasis_message_type(msg);
885  } else {
886  /* Normal cache update event. */
887  msg_put = message;
888  msg = message;
889  }
890  ast_assert(msg_type != NULL);
891 
892  msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
893  msg_id = caching_topic->cache->id_fn(msg);
894  if (msg_id && msg_eid) {
895  struct stasis_message *update;
896  struct cache_put_snapshots snapshots;
897 
898  /* Update the cache */
899  snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
900  if (snapshots.old || msg_put) {
901  if (stasis_topic_subscribers(caching_topic->topic)) {
902  update = update_create(snapshots.old, msg_put);
903  if (update) {
904  stasis_publish(caching_topic->topic, update);
905  ao2_ref(update, -1);
906  }
907  }
908  } else {
909  ast_debug(1,
910  "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
911  stasis_topic_name(caching_topic->topic),
912  stasis_message_type_name(msg_type), msg_id);
913  }
914 
915  if (snapshots.aggregate_old != snapshots.aggregate_new) {
916  if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
917  caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
918  snapshots.aggregate_new);
919  }
920  if (stasis_topic_subscribers(caching_topic->topic)) {
921  update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
922  if (update) {
923  stasis_publish(caching_topic->topic, update);
924  ao2_ref(update, -1);
925  }
926  }
927  }
928 
929  ao2_cleanup(snapshots.old);
930  ao2_cleanup(snapshots.aggregate_old);
931  ao2_cleanup(snapshots.aggregate_new);
932  }
933 
934  ao2_cleanup(caching_topic_needs_unref);
935 }
936 
937 static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
938 {
939  struct stasis_cache_entry *entry = v_obj;
940 
941  if (!entry) {
942  return;
943  }
944  prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type),
945  entry->key.id, entry->key.hash);
946 }
947 
949 {
950  struct stasis_caching_topic *caching_topic;
951  static int caching_id;
952  char *new_name;
953  int ret;
954 
955  ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
956  if (ret < 0) {
957  return NULL;
958  }
959 
960  caching_topic = ao2_alloc_options(sizeof(*caching_topic),
962  if (caching_topic == NULL) {
963  ast_free(new_name);
964 
965  return NULL;
966  }
967 
968  caching_topic->topic = stasis_topic_create(new_name);
969  if (caching_topic->topic == NULL) {
970  ao2_ref(caching_topic, -1);
971  ast_free(new_name);
972 
973  return NULL;
974  }
975 
976  ao2_ref(cache, +1);
977  caching_topic->cache = cache;
978  if (!cache->registered) {
979  if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
980  ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
981  cache->entries, new_name);
982  } else {
983  cache->registered = 1;
984  }
985  }
986  ast_free(new_name);
987 
988  caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989  if (caching_topic->sub == NULL) {
990  ao2_ref(caching_topic, -1);
991 
992  return NULL;
993  }
994 
995  ao2_ref(original_topic, +1);
996  caching_topic->original_topic = original_topic;
997 
998  /* The subscription holds the reference, so no additional ref bump. */
999  return caching_topic;
1000 }
1001 
1002 static void stasis_cache_cleanup(void)
1003 {
1006 }
1007 
1009 {
1011 
1013  return -1;
1014  }
1015 
1017  return -1;
1018  }
1019 
1020  return 0;
1021 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity&#39;s cache entry snapshot by index.
Definition: stasis_cache.c:375
static const char type[]
Definition: chan_ooh323.c:109
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
cache_aggregate_calc_fn aggregate_calc_fn
Definition: stasis_cache.c:49
Internal Stasis APIs.
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:971
struct ast_eid eid
struct stasis_message * stasis_cache_clear_create(struct stasis_message *id_message)
A message which instructs the caching topic to remove an entry from its cache.
Definition: stasis_cache.c:778
struct cache_entry_key key
Definition: stasis_cache.c:174
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
const char * id
Definition: stasis_cache.c:168
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
static int cache_dump_all_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:741
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:644
static void cache_entry_compute_hash(struct cache_entry_key *key)
Definition: stasis_cache.c:206
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
void(* cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate)
Callback to publish the aggregate cache entry message.
Definition: stasis.h:1059
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
struct ao2_container * container
Definition: stasis_cache.c:692
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Definition: stasis_cache.c:757
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1152
Generic (perhaps overly so) hashtable implementation Hash Table support in Asterisk.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1335
struct stasis_cache * cache
Definition: stasis_cache.c:56
static struct stasis_cache_entry * cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
Definition: stasis_cache.c:212
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1079
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
static int cache_entry_cmp(void *obj, void *arg, int flags)
Definition: stasis_cache.c:288
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
Definition: vector.h:438
struct stasis_topic * original_topic
Definition: stasis_cache.c:58
static void stasis_cache_update_dtor(void *obj)
Definition: stasis_cache.c:783
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:695
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
#define ao2_unlock(a)
Definition: astobj2.h:730
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
struct stasis_message * old
Definition: stasis_cache.c:498
#define NULL
Definition: resample.c:96
#define ao2_wrlock(a)
Definition: astobj2.h:720
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
Definition: stasis_cache.c:587
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
An Entity ID is essentially a MAC address, brief and unique.
Definition: utils.h:786
static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Definition: stasis_cache.c:833
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
struct stasis_subscription * sub
Definition: stasis_cache.c:59
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
Definition: main/utils.c:2842
Utility functions.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:858
struct stasis_message *(* cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Callback to calculate the aggregate cache entry.
Definition: stasis.h:1033
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1442
struct ao2_container * entries
Definition: stasis_cache.c:47
#define ao2_bump(obj)
Definition: astobj2.h:491
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
Definition: stasis_cache.c:365
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ast_log
Definition: astobj2.c:42
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
const struct ast_eid * eid
Definition: stasis_cache.c:694
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
static struct stasis_message * cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
Definition: stasis_cache.c:425
struct stasis_message * aggregate_new
Definition: stasis_cache.c:502
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity&#39;s cache entry snapshot.
Definition: stasis_cache.c:370
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
Cache update message.
Definition: stasis.h:967
static struct stasis_cache_entry * cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
Definition: stasis_cache.c:396
static void cache_entry_dtor(void *obj)
Definition: stasis_cache.c:183
static struct cache_put_snapshots cache_put(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:505
struct stasis_message * aggregate_old
Definition: stasis_cache.c:500
unsigned char eid[6]
Definition: utils.h:787
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:973
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1107
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_topic * topic
Definition: stasis_cache.c:57
#define ao2_rdlock(a)
Definition: astobj2.h:719
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:628
static struct stasis_message * update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:795
The key for an entry in the cache.
Definition: stasis_cache.c:164
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
#define ast_free(a)
Definition: astmm.h:182
unsigned int hash
Definition: stasis_cache.c:170
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:892
Vector container support.
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
Definition: hashtab.c:153
struct ao2_container * cache
Definition: pbx_realtime.c:77
static void stasis_caching_topic_dtor(void *obj)
Definition: stasis_cache.c:62
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1120
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:297
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct stasis_message * local
Definition: stasis_cache.c:178
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1025
struct stasis_forward * sub
Definition: res_corosync.c:240
#define ao2_unlink_flags(container, obj, flags)
Definition: astobj2.h:1622
struct stasis_message_type * type
Definition: stasis_cache.c:693
static void stasis_cache_cleanup(void)
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
snapshot_get_id id_fn
Definition: stasis_cache.c:48
Definition: search.h:40
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
enum queue_result id
Definition: app_queue.c:1507
static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
Definition: stasis_cache.c:937
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
Definition: stasis_cache.c:146
Generic container type.
#define NUM_CACHE_BUCKETS
Definition: stasis_cache.c:42
Search option field mask.
Definition: astobj2.h:1076
cache_aggregate_publish_fn aggregate_publish_fn
Definition: stasis_cache.c:50
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709
static struct stasis_message * cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
Definition: stasis_cache.c:631
static struct stasis_message * cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
Definition: stasis_cache.c:466
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type)
struct stasis_message_type * type
Definition: stasis_cache.c:166
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
Definition: stasis_cache.c:563
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
Definition: stasis_cache.c:736
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
struct stasis_message * aggregate
Definition: stasis_cache.c:176
static int cache_entry_hash(const void *obj, int flags)
Definition: stasis_cache.c:266
static void cache_dtor(void *obj)
Definition: stasis_cache.c:326
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition: stasis.h:1011
int stasis_cache_init(void)
static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
Definition: stasis_cache.c:697
#define ao2_link(container, obj)
Definition: astobj2.h:1549
Definition: stasis_cache.c:173