Asterisk - The Open Source Telephony Project  18.5.0
res_stasis_test.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \brief Test infrastructure for dealing with Stasis.
22  *
23  * \author David M. Lee, II <[email protected]>
24  */
25 
26 /*** MODULEINFO
27  <depend>TEST_FRAMEWORK</depend>
28  <support_level>core</support_level>
29  ***/
30 
31 #include "asterisk.h"
32 
33 #include "asterisk/astobj2.h"
34 #include "asterisk/module.h"
35 #include "asterisk/stasis_test.h"
36 
38 
39 static void stasis_message_sink_dtor(void *obj)
40 {
41  struct stasis_message_sink *sink = obj;
42 
43  {
44  SCOPED_MUTEX(lock, &sink->lock);
45  while (!sink->is_done) {
46  /* Normally waiting forever is bad, but if we're not
47  * done, we're not done. */
48  ast_cond_wait(&sink->cond, &sink->lock);
49  }
50  }
51 
52  ast_mutex_destroy(&sink->lock);
53  ast_cond_destroy(&sink->cond);
54 
55  while (sink->num_messages > 0) {
56  ao2_cleanup(sink->messages[--sink->num_messages]);
57  }
58  ast_free(sink->messages);
59  sink->messages = NULL;
60  sink->max_messages = 0;
61 }
62 
63 static struct timespec make_deadline(int timeout_millis)
64 {
65  struct timeval start = ast_tvnow();
66  struct timeval delta = {
67  .tv_sec = timeout_millis / 1000,
68  .tv_usec = (timeout_millis % 1000) * 1000,
69  };
70  struct timeval deadline_tv = ast_tvadd(start, delta);
71  struct timespec deadline = {
72  .tv_sec = deadline_tv.tv_sec,
73  .tv_nsec = 1000 * deadline_tv.tv_usec,
74  };
75 
76  return deadline;
77 }
78 
80 {
81  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
82 
83  sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
84  if (!sink) {
85  return NULL;
86  }
87  ast_mutex_init(&sink->lock);
88  ast_cond_init(&sink->cond, NULL);
89  sink->max_messages = 4;
90  sink->messages =
91  ast_malloc(sizeof(*sink->messages) * sink->max_messages);
92  if (!sink->messages) {
93  return NULL;
94  }
95  ao2_ref(sink, +1);
96  return sink;
97 }
98 
99 /*!
100  * \brief Implementation of the stasis_message_sink_cb() callback.
101  *
102  * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
103  * it has to do with how we previously loaded modules, using \c RTLD_LAZY.
104  *
105  * The stasis_message_sink_cb() function gave us a layer of indirection so that
106  * the initial lazy binding would still work as expected.
107  */
108 static void message_sink_cb(void *data, struct stasis_subscription *sub,
109  struct stasis_message *message)
110 {
111  struct stasis_message_sink *sink = data;
112 
113  SCOPED_MUTEX(lock, &sink->lock);
114 
115  if (stasis_subscription_final_message(sub, message)) {
116  sink->is_done = 1;
117  ast_cond_signal(&sink->cond);
118  return;
119  }
120 
122  /* Ignore subscription changes */
123  return;
124  }
125 
126  if (sink->num_messages == sink->max_messages) {
127  size_t new_max_messages = sink->max_messages * 2;
128  struct stasis_message **new_messages = ast_realloc(
129  sink->messages,
130  sizeof(*new_messages) * new_max_messages);
131  if (!new_messages) {
132  return;
133  }
134  sink->max_messages = new_max_messages;
135  sink->messages = new_messages;
136  }
137 
138  ao2_ref(message, +1);
139  sink->messages[sink->num_messages++] = message;
140  ast_cond_signal(&sink->cond);
141 }
142 
144 {
145  return message_sink_cb;
146 }
147 
148 
150  int num_messages, int timeout_millis)
151 {
152  struct timespec deadline = make_deadline(timeout_millis);
153 
154  SCOPED_MUTEX(lock, &sink->lock);
155  while (sink->num_messages < num_messages) {
156  int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
157 
158  if (r == ETIMEDOUT) {
159  break;
160  }
161  if (r != 0) {
162  ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
163  strerror(r));
164  break;
165  }
166  }
167  return sink->num_messages;
168 }
169 
171  int num_messages, int timeout_millis)
172 {
173  struct timespec deadline = make_deadline(timeout_millis);
174 
175  SCOPED_MUTEX(lock, &sink->lock);
176  while (sink->num_messages == num_messages) {
177  int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
178 
179  if (r == ETIMEDOUT) {
180  break;
181  }
182  if (r != 0) {
183  ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
184  strerror(r));
185  break;
186  }
187  }
188  return sink->num_messages;
189 }
190 
192  stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
193 {
194  struct timespec deadline = make_deadline(timeout_millis);
195 
196  SCOPED_MUTEX(lock, &sink->lock);
197 
198  /* wait for the start */
199  while (sink->num_messages < start + 1) {
200  int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
201 
202  if (r == ETIMEDOUT) {
203  /* Timed out waiting for the start */
204  return -1;
205  }
206  if (r != 0) {
207  ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
208  strerror(r));
209  return -2;
210  }
211  }
212 
213 
214  while (!cmp_cb(sink->messages[start], data)) {
215  ++start;
216 
217  while (sink->num_messages < start + 1) {
218  int r = ast_cond_timedwait(&sink->cond,
219  &sink->lock, &deadline);
220 
221  if (r == ETIMEDOUT) {
222  return -1;
223  }
224  if (r != 0) {
226  "Unexpected condition error: %s\n",
227  strerror(r));
228  return -2;
229  }
230  }
231  }
232 
233  return start;
234 }
235 
237 {
238  RAII_VAR(void *, data, NULL, ao2_cleanup);
239 
240  if (!stasis_test_message_type()) {
241  return NULL;
242  }
243 
244  /* We just need the unique pointer; don't care what's in it */
245  data = ao2_alloc(1, NULL);
246  if (!data) {
247  return NULL;
248  }
249 
251 }
252 
253 static int unload_module(void)
254 {
256  return 0;
257 }
258 
259 static int load_module(void)
260 {
263  }
264 
266 }
267 
269  .support_level = AST_MODULE_SUPPORT_CORE,
270  .load = load_module,
271  .unload = unload_module,
272  .load_pri = AST_MODPRI_APP_DEPEND,
273 );
Structure that collects messages from a topic.
Definition: stasis_test.h:44
Asterisk main include file. File version handling, generic pbx functions.
#define ast_realloc(p, len)
A wrapper for realloc()
Definition: astmm.h:228
struct stasis_message ** messages
Definition: stasis_test.h:57
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
struct stasis_message_type * stasis_test_message_type(void)
Gets the type of messages created by stasis_test_message_create().
int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Wait for a sink&#39;s num_messages field to reach a certain level.
struct stasis_message_sink * stasis_message_sink_create(void)
Create a message sink.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
static int load_module(void)
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_init(cond, attr)
Definition: lock.h:199
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
stasis_subscription_cb stasis_message_sink_cb(void)
Topic callback to receive messages.
#define NULL
Definition: resample.c:96
#define ast_cond_signal(cond)
Definition: lock.h:201
int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
Wait for a message that matches the given criteria.
struct stasis_message * stasis_test_message_create(void)
Creates a test message.
#define ast_log
Definition: astobj2.c:42
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
ast_mutex_t lock
Definition: app_meetme.c:1091
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
#define LOG_ERROR
Definition: logger.h:285
STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type)
static struct timespec make_deadline(int timeout_millis)
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2283
static void stasis_message_sink_dtor(void *obj)
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:615
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
#define ast_free(a)
Definition: astmm.h:182
int stasis_message_sink_should_stay(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Ensures that no new messages are received.
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
ast_mutex_t lock
Definition: stasis_test.h:46
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS|AST_MODFLAG_LOAD_ORDER, "HTTP Phone Provisioning",.support_level=AST_MODULE_SUPPORT_EXTENDED,.load=load_module,.unload=unload_module,.reload=reload,.load_pri=AST_MODPRI_CHANNEL_DEPEND,.requires="http",)
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * sub
Definition: res_corosync.c:240
int(* stasis_wait_cb)(struct stasis_message *msg, const void *data)
Definition: stasis_test.h:95
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
static void message_sink_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Implementation of the stasis_message_sink_cb() callback.
static int unload_module(void)
#define ast_mutex_init(pmutex)
Definition: lock.h:184
#define ast_mutex_destroy(a)
Definition: lock.h:186
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
Asterisk module definitions.
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
Test infrastructure for dealing with Stasis.