Asterisk - The Open Source Telephony Project  18.5.0
Data Structures | Functions
stasis_cache_pattern.c File Reference

Typical cache pattern for Stasis topics. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_cache_pattern.h"
Include dependency graph for stasis_cache_pattern.c:

Go to the source code of this file.

Data Structures

struct  stasis_cp_all
 
struct  stasis_cp_single
 

Functions

static void all_dtor (void *obj)
 
static void one_dtor (void *obj)
 
struct stasis_cachestasis_cp_all_cache (struct stasis_cp_all *all)
 Get the cache. More...
 
struct stasis_cp_allstasis_cp_all_create (const char *name, snapshot_get_id id_fn)
 Create an all instance of the cache pattern. More...
 
struct stasis_topicstasis_cp_all_topic (struct stasis_cp_all *all)
 Get the aggregate topic. More...
 
struct stasis_topicstasis_cp_all_topic_cached (struct stasis_cp_all *all)
 Get the caching topic. More...
 
int stasis_cp_single_accept_message_type (struct stasis_cp_single *one, struct stasis_message_type *type)
 Indicate to an instance that we are interested in a message type. More...
 
struct stasis_cp_singlestasis_cp_single_create (struct stasis_cp_all *all, const char *name)
 Create the 'one' side of the cache pattern. More...
 
int stasis_cp_single_set_filter (struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache. More...
 
struct stasis_topicstasis_cp_single_topic (struct stasis_cp_single *one)
 Get the topic for this instance. More...
 
struct stasis_topicstasis_cp_single_topic_cached (struct stasis_cp_single *one)
 Get the caching topic for this instance. More...
 
void stasis_cp_single_unsubscribe (struct stasis_cp_single *one)
 Stops caching and forwarding messages. More...
 
struct stasis_cp_singlestasis_cp_sink_create (struct stasis_cp_all *all, const char *name)
 Create a sink in the cache pattern. More...
 

Detailed Description

Typical cache pattern for Stasis topics.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis_cache_pattern.c.

Function Documentation

◆ all_dtor()

static void all_dtor ( void *  obj)
static

Definition at line 51 of file stasis_cache_pattern.c.

References ao2_cleanup, stasis_cp_all::cache, stasis_cp_all::forward_all_to_cached, NULL, stasis_forward_cancel(), stasis_cp_all::topic, and stasis_cp_all::topic_cached.

Referenced by stasis_cp_all_create().

52 {
53  struct stasis_cp_all *all = obj;
54 
55  ao2_cleanup(all->topic);
56  all->topic = NULL;
58  all->topic_cached = NULL;
59  ao2_cleanup(all->cache);
60  all->cache = NULL;
63 }
struct stasis_cache * cache
struct stasis_forward * forward_all_to_cached
#define NULL
Definition: resample.c:96
struct stasis_topic * topic_cached
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_topic * topic

◆ one_dtor()

static void one_dtor ( void *  obj)
static

Definition at line 126 of file stasis_cache_pattern.c.

References ao2_cleanup, ast_assert, stasis_cp_single::forward_cached_to_all, stasis_cp_single::forward_topic_to_all, NULL, stasis_cp_single::topic, and stasis_cp_single::topic_cached.

Referenced by stasis_cp_sink_create().

127 {
128  struct stasis_cp_single *one = obj;
129 
130  /* Should already be unsubscribed */
131  ast_assert(one->topic_cached == NULL);
134 
135  ao2_cleanup(one->topic);
136  one->topic = NULL;
137 }
struct stasis_forward * forward_topic_to_all
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
struct stasis_topic * topic
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * forward_cached_to_all
struct stasis_caching_topic * topic_cached

◆ stasis_cp_all_cache()

struct stasis_cache* stasis_cp_all_cache ( struct stasis_cp_all all)

Get the cache.

This is the shared cache for all corresponding stasis_cp_single objects.

Parameters
allAll side caching pattern object.
Returns
The cache.
NULL if all is NULL

Definition at line 118 of file stasis_cache_pattern.c.

References stasis_cp_all::cache, and NULL.

Referenced by ast_endpoint_cache().

119 {
120  if (!all) {
121  return NULL;
122  }
123  return all->cache;
124 }
struct stasis_cache * cache
#define NULL
Definition: resample.c:96

◆ stasis_cp_all_create()

struct stasis_cp_all* stasis_cp_all_create ( const char *  name,
snapshot_get_id  id_fn 
)

Create an all instance of the cache pattern.

This object is AO2 managed, so dispose of it with ao2_cleanup().

Parameters
nameBase name of the topics.
id_fnIdentity function for the cache.
Returns
All side instance.
NULL on error.

Definition at line 65 of file stasis_cache_pattern.c.

References all_dtor(), ao2_ref, ao2_t_alloc, ast_asprintf, ast_atomic_fetchadd_int(), ast_free, stasis_cp_all::cache, stasis_cp_all::forward_all_to_cached, NULL, stasis_cache_create(), stasis_forward_all(), stasis_topic_create(), stasis_cp_all::topic, and stasis_cp_all::topic_cached.

Referenced by ast_endpoint_stasis_init().

67 {
68  char *cached_name = NULL;
69  struct stasis_cp_all *all;
70  static int cache_id;
71 
72  all = ao2_t_alloc(sizeof(*all), all_dtor, name);
73  if (!all) {
74  return NULL;
75  }
76 
77  ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
78  if (!cached_name) {
79  ao2_ref(all, -1);
80 
81  return NULL;
82  }
83 
85  all->topic_cached = stasis_topic_create(cached_name);
86  ast_free(cached_name);
87  all->cache = stasis_cache_create(id_fn);
90 
91  if (!all->topic || !all->topic_cached || !all->cache ||
92  !all->forward_all_to_cached) {
93  ao2_ref(all, -1);
94 
95  return NULL;
96  }
97 
98  return all;
99 }
struct stasis_cache * cache
static void all_dtor(void *obj)
struct stasis_forward * forward_all_to_cached
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
#define NULL
Definition: resample.c:96
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
struct stasis_topic * topic_cached
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
struct stasis_topic * topic
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578

◆ stasis_cp_all_topic()

struct stasis_topic* stasis_cp_all_topic ( struct stasis_cp_all all)

Get the aggregate topic.

This topic aggregates all messages published to corresponding stasis_cp_single_topic() topics.

Parameters
allAll side caching pattern object.
Returns
The aggregate topic.
NULL if all is NULL

Definition at line 101 of file stasis_cache_pattern.c.

References NULL, and stasis_cp_all::topic.

Referenced by ast_endpoint_topic_all().

102 {
103  if (!all) {
104  return NULL;
105  }
106  return all->topic;
107 }
#define NULL
Definition: resample.c:96
struct stasis_topic * topic

◆ stasis_cp_all_topic_cached()

struct stasis_topic* stasis_cp_all_topic_cached ( struct stasis_cp_all all)

Get the caching topic.

This topic aggregates all messages from the corresponding stasis_cp_single_topic_cached() topics.

Note that one normally only subscribes to the caching topic, since data is fed to it from its upstream topic.

Parameters
allAll side caching pattern object.
Returns
The aggregate caching topic.
NULL if all is NULL

Definition at line 109 of file stasis_cache_pattern.c.

References NULL, and stasis_cp_all::topic_cached.

Referenced by ast_endpoint_topic_all_cached().

111 {
112  if (!all) {
113  return NULL;
114  }
115  return all->topic_cached;
116 }
#define NULL
Definition: resample.c:96
struct stasis_topic * topic_cached

◆ stasis_cp_single_accept_message_type()

int stasis_cp_single_accept_message_type ( struct stasis_cp_single one,
struct stasis_message_type type 
)

Indicate to an instance that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
oneOne side of the cache pattern.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 222 of file stasis_cache_pattern.c.

References stasis_caching_accept_message_type(), and stasis_cp_single::topic_cached.

Referenced by endpoint_internal_create().

224 {
225  if (!one) {
226  return -1;
227  }
229 }
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
struct stasis_caching_topic * topic_cached

◆ stasis_cp_single_create()

struct stasis_cp_single* stasis_cp_single_create ( struct stasis_cp_all all,
const char *  name 
)

Create the 'one' side of the cache pattern.

Create the 'one' and forward to all's topic and topic_cached.

Dispose of using stasis_cp_single_unsubscribe().

Parameters
allCorresponding all side.
nameBase name for the topics.
Returns
One side instance

Definition at line 139 of file stasis_cache_pattern.c.

References ao2_ref, stasis_cp_single::forward_cached_to_all, stasis_cp_single::forward_topic_to_all, NULL, stasis_caching_get_topic(), stasis_cp_sink_create(), stasis_forward_all(), stasis_cp_all::topic, stasis_cp_single::topic, stasis_cp_all::topic_cached, and stasis_cp_single::topic_cached.

Referenced by endpoint_internal_create().

141 {
142  struct stasis_cp_single *one;
143 
144  one = stasis_cp_sink_create(all, name);
145  if (!one) {
146  return NULL;
147  }
148 
152 
153  if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
154  ao2_ref(one, -1);
155 
156  return NULL;
157  }
158 
159  return one;
160 }
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
struct stasis_forward * forward_topic_to_all
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * topic_cached
struct stasis_topic * topic
static const char name[]
Definition: cdr_mysql.c:74
struct stasis_topic * topic
struct stasis_forward * forward_cached_to_all
struct stasis_caching_topic * topic_cached
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85

◆ stasis_cp_single_set_filter()

int stasis_cp_single_set_filter ( struct stasis_cp_single one,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
oneOne side of the cache pattern.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 231 of file stasis_cache_pattern.c.

References stasis_caching_set_filter(), and stasis_cp_single::topic_cached.

Referenced by endpoint_internal_create().

233 {
234  if (!one) {
235  return -1;
236  }
238 }
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
struct stasis_caching_topic * topic_cached
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709

◆ stasis_cp_single_topic()

struct stasis_topic* stasis_cp_single_topic ( struct stasis_cp_single one)

Get the topic for this instance.

This is the topic to which one would post instance-specific messages, or subscribe for single-instance, uncached messages.

Parameters
oneOne side of the cache pattern.
Returns
The main topic.
NULL if one is NULL

Definition at line 205 of file stasis_cache_pattern.c.

References NULL, and stasis_cp_single::topic.

Referenced by ast_endpoint_topic(), and endpoint_internal_create().

206 {
207  if (!one) {
208  return NULL;
209  }
210  return one->topic;
211 }
#define NULL
Definition: resample.c:96
struct stasis_topic * topic

◆ stasis_cp_single_topic_cached()

struct stasis_topic* stasis_cp_single_topic_cached ( struct stasis_cp_single one)

Get the caching topic for this instance.

Note that one normally only subscribes to the caching topic, since data is fed to it from its upstream topic.

Parameters
oneOne side of the cache pattern.
Returns
The caching topic.
NULL if one is NULL

Definition at line 213 of file stasis_cache_pattern.c.

References NULL, stasis_caching_get_topic(), and stasis_cp_single::topic_cached.

Referenced by ast_endpoint_topic_cached().

215 {
216  if (!one) {
217  return NULL;
218  }
220 }
#define NULL
Definition: resample.c:96
struct stasis_caching_topic * topic_cached
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85

◆ stasis_cp_single_unsubscribe()

void stasis_cp_single_unsubscribe ( struct stasis_cp_single one)

Stops caching and forwarding messages.

Parameters
oneOne side of the cache pattern.

Definition at line 189 of file stasis_cache_pattern.c.

References ao2_cleanup, stasis_cp_single::forward_cached_to_all, stasis_cp_single::forward_topic_to_all, NULL, stasis_caching_unsubscribe(), stasis_forward_cancel(), and stasis_cp_single::topic_cached.

Referenced by endpoint_dtor().

190 {
191  if (!one) {
192  return;
193  }
194 
196  one->forward_topic_to_all = NULL;
200  one->topic_cached = NULL;
201 
202  ao2_cleanup(one);
203 }
struct stasis_forward * forward_topic_to_all
#define NULL
Definition: resample.c:96
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * forward_cached_to_all
struct stasis_caching_topic * topic_cached

◆ stasis_cp_sink_create()

struct stasis_cp_single* stasis_cp_sink_create ( struct stasis_cp_all all,
const char *  name 
)

Create a sink in the cache pattern.

Create the 'one' but do not automatically forward to the all's topic. This is useful when aggregating other topic's messages created with stasis_cp_single_create in another caching topic without replicating those messages in the all's topics.

Dispose of using stasis_cp_single_unsubscribe().

Parameters
allCorresponding all side.
nameBase name for the topics.
Returns
One side instance

Definition at line 162 of file stasis_cache_pattern.c.

References ao2_ref, ao2_t_alloc, stasis_cp_all::cache, NULL, one_dtor(), stasis_caching_topic_create(), stasis_topic_create(), stasis_cp_single::topic, and stasis_cp_single::topic_cached.

Referenced by endpoint_internal_create(), and stasis_cp_single_create().

164 {
165  struct stasis_cp_single *one;
166 
167  one = ao2_t_alloc(sizeof(*one), one_dtor, name);
168  if (!one) {
169  return NULL;
170  }
171 
173  if (!one->topic) {
174  ao2_ref(one, -1);
175 
176  return NULL;
177  }
178 
180  if (!one->topic_cached) {
181  ao2_ref(one, -1);
182 
183  return NULL;
184  }
185 
186  return one;
187 }
struct stasis_cache * cache
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
#define NULL
Definition: resample.c:96
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
struct stasis_topic * topic
static const char name[]
Definition: cdr_mysql.c:74
static void one_dtor(void *obj)
struct stasis_caching_topic * topic_cached