Asterisk - The Open Source Telephony Project  18.5.0
stasis_cache_pattern.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Typical cache pattern for Stasis topics.
22  *
23  * \author David M. Lee, II <[email protected]>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
34 
35 struct stasis_cp_all {
39 
41 };
42 
46 
49 };
50 
51 static void all_dtor(void *obj)
52 {
53  struct stasis_cp_all *all = obj;
54 
55  ao2_cleanup(all->topic);
56  all->topic = NULL;
58  all->topic_cached = NULL;
59  ao2_cleanup(all->cache);
60  all->cache = NULL;
63 }
64 
66  snapshot_get_id id_fn)
67 {
68  char *cached_name = NULL;
69  struct stasis_cp_all *all;
70  static int cache_id;
71 
72  all = ao2_t_alloc(sizeof(*all), all_dtor, name);
73  if (!all) {
74  return NULL;
75  }
76 
77  ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
78  if (!cached_name) {
79  ao2_ref(all, -1);
80 
81  return NULL;
82  }
83 
84  all->topic = stasis_topic_create(name);
85  all->topic_cached = stasis_topic_create(cached_name);
86  ast_free(cached_name);
87  all->cache = stasis_cache_create(id_fn);
90 
91  if (!all->topic || !all->topic_cached || !all->cache ||
92  !all->forward_all_to_cached) {
93  ao2_ref(all, -1);
94 
95  return NULL;
96  }
97 
98  return all;
99 }
100 
102 {
103  if (!all) {
104  return NULL;
105  }
106  return all->topic;
107 }
108 
110  struct stasis_cp_all *all)
111 {
112  if (!all) {
113  return NULL;
114  }
115  return all->topic_cached;
116 }
117 
119 {
120  if (!all) {
121  return NULL;
122  }
123  return all->cache;
124 }
125 
126 static void one_dtor(void *obj)
127 {
128  struct stasis_cp_single *one = obj;
129 
130  /* Should already be unsubscribed */
131  ast_assert(one->topic_cached == NULL);
134 
135  ao2_cleanup(one->topic);
136  one->topic = NULL;
137 }
138 
140  const char *name)
141 {
142  struct stasis_cp_single *one;
143 
144  one = stasis_cp_sink_create(all, name);
145  if (!one) {
146  return NULL;
147  }
148 
152 
153  if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
154  ao2_ref(one, -1);
155 
156  return NULL;
157  }
158 
159  return one;
160 }
161 
163  const char *name)
164 {
165  struct stasis_cp_single *one;
166 
167  one = ao2_t_alloc(sizeof(*one), one_dtor, name);
168  if (!one) {
169  return NULL;
170  }
171 
172  one->topic = stasis_topic_create(name);
173  if (!one->topic) {
174  ao2_ref(one, -1);
175 
176  return NULL;
177  }
178 
180  if (!one->topic_cached) {
181  ao2_ref(one, -1);
182 
183  return NULL;
184  }
185 
186  return one;
187 }
188 
190 {
191  if (!one) {
192  return;
193  }
194 
196  one->forward_topic_to_all = NULL;
200  one->topic_cached = NULL;
201 
202  ao2_cleanup(one);
203 }
204 
206 {
207  if (!one) {
208  return NULL;
209  }
210  return one->topic;
211 }
212 
214  struct stasis_cp_single *one)
215 {
216  if (!one) {
217  return NULL;
218  }
220 }
221 
223  struct stasis_message_type *type)
224 {
225  if (!one) {
226  return -1;
227  }
229 }
230 
233 {
234  if (!one) {
235  return -1;
236  }
237  return stasis_caching_set_filter(one->topic_cached, filter);
238 }
struct stasis_cache * cache
static const char type[]
Definition: chan_ooh323.c:109
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
static void all_dtor(void *obj)
struct stasis_forward * forward_topic_to_all
Caching pattern for Stasis Message Bus API topics.
struct stasis_forward * forward_all_to_cached
Asterisk main include file. File version handling, generic pbx functions.
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
struct stasis_cp_single * stasis_cp_single_create(struct stasis_cp_all *all, const char *name)
Create the &#39;one&#39; side of the cache pattern.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
struct stasis_topic * stasis_cp_all_topic(struct stasis_cp_all *all)
Get the aggregate topic.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct stasis_cp_all * stasis_cp_all_create(const char *name, snapshot_get_id id_fn)
Create an all instance of the cache pattern.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
struct stasis_cache * stasis_cp_all_cache(struct stasis_cp_all *all)
Get the cache.
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
struct stasis_topic * topic_cached
struct stasis_topic * topic
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, struct stasis_message_type *type)
Indicate to an instance that we are interested in a message type.
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
static void one_dtor(void *obj)
struct stasis_topic * stasis_cp_single_topic(struct stasis_cp_single *one)
Get the topic for this instance.
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:297
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_topic * stasis_cp_single_topic_cached(struct stasis_cp_single *one)
Get the caching topic for this instance.
struct stasis_topic * topic
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
Stops caching and forwarding messages.
struct stasis_topic * stasis_cp_all_topic_cached(struct stasis_cp_all *all)
Get the caching topic.
struct stasis_forward * forward_cached_to_all
struct stasis_caching_topic * topic_cached
Forwarding information.
Definition: stasis.c:1531
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
int stasis_cp_single_set_filter(struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition: stasis.h:1011