Asterisk - The Open Source Telephony Project  18.5.0
test_stasis_endpoints.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \brief Test endpoints.
22  *
23  * \author\verbatim David M. Lee, II <[email protected]> \endverbatim
24  *
25  * \ingroup tests
26  */
27 
28 /*** MODULEINFO
29  <depend>TEST_FRAMEWORK</depend>
30  <depend>res_stasis_test</depend>
31  <support_level>core</support_level>
32  ***/
33 
34 #include "asterisk.h"
35 
36 #include "asterisk/astobj2.h"
37 #include "asterisk/channel.h"
38 #include "asterisk/endpoints.h"
39 #include "asterisk/module.h"
42 #include "asterisk/stasis_test.h"
43 #include "asterisk/test.h"
44 
45 static const char *test_category = "/stasis/endpoints/";
46 
47 /*! \brief Message matcher looking for cache update messages */
48 static int cache_update(struct stasis_message *msg, const void *data) {
50  struct ast_endpoint_snapshot *snapshot;
51  const char *name = data;
52 
54  return 0;
55  }
56 
57  update = stasis_message_data(msg);
58  if (ast_endpoint_snapshot_type() != update->type) {
59  return 0;
60  }
61 
62  snapshot = stasis_message_data(update->old_snapshot);
63  if (!snapshot) {
64  snapshot = stasis_message_data(update->new_snapshot);
65  }
66 
67  return 0 == strcmp(name, snapshot->resource);
68 }
69 
71 {
73  RAII_VAR(struct ast_channel *, chan, NULL, ast_hangup);
74  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
76  struct stasis_message *msg;
77  struct stasis_message_type *type;
78  struct ast_endpoint_snapshot *actual_snapshot;
79  int actual_count;
80 
81  switch (cmd) {
82  case TEST_INIT:
83  info->name = __func__;
84  info->category = test_category;
85  info->summary = "Test endpoint updates as its state changes";
86  info->description =
87  "Test endpoint updates as its state changes";
88  return AST_TEST_NOT_RUN;
89  case TEST_EXECUTE:
90  break;
91  }
92 
93  uut = ast_endpoint_create("TEST", __func__);
94  ast_test_validate(test, NULL != uut);
95 
97  ast_test_validate(test, NULL != sink);
98 
100  stasis_message_sink_cb(), sink);
101  ast_test_validate(test, NULL != sub);
102 
104  actual_count = stasis_message_sink_wait_for_count(sink, 1,
106  ast_test_validate(test, 1 == actual_count);
107  msg = sink->messages[0];
108  type = stasis_message_type(msg);
109  ast_test_validate(test, ast_endpoint_snapshot_type() == type);
110  actual_snapshot = stasis_message_data(msg);
111  ast_test_validate(test, AST_ENDPOINT_OFFLINE == actual_snapshot->state);
112 
113  ast_endpoint_set_max_channels(uut, 8675309);
114  actual_count = stasis_message_sink_wait_for_count(sink, 2,
116  ast_test_validate(test, 2 == actual_count);
117  msg = sink->messages[1];
118  type = stasis_message_type(msg);
119  ast_test_validate(test, ast_endpoint_snapshot_type() == type);
120  actual_snapshot = stasis_message_data(msg);
121  ast_test_validate(test, 8675309 == actual_snapshot->max_channels);
122 
123  return AST_TEST_PASS;
124 }
125 
126 AST_TEST_DEFINE(cache_clear)
127 {
129  RAII_VAR(struct ast_channel *, chan, NULL, ast_hangup);
130  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
132  struct stasis_message *msg;
133  struct stasis_message_type *type;
134  struct ast_endpoint_snapshot *actual_snapshot;
135  struct stasis_cache_update *update;
136  int message_index;
137 
138  switch (cmd) {
139  case TEST_INIT:
140  info->name = __func__;
141  info->category = test_category;
142  info->summary = "Test endpoint state change messages";
143  info->description = "Test endpoint state change messages";
144  return AST_TEST_NOT_RUN;
145  case TEST_EXECUTE:
146  break;
147  }
148 
149  /* Subscribe to the cache topic */
151  ast_test_validate(test, NULL != sink);
152 
155  stasis_message_sink_cb(), sink);
156  ast_test_validate(test, NULL != sub);
157 
158  uut = ast_endpoint_create("TEST", __func__);
159  ast_test_validate(test, NULL != uut);
160 
161  /* Since the cache topic is a singleton (ew), it may have messages from
162  * elsewheres that it's processing, or maybe even some final messages
163  * from the prior test. We've got to wait_for our specific message,
164  * instead of wait_for_count.
165  */
166  message_index = stasis_message_sink_wait_for(sink, 0,
168  ast_test_validate(test, 0 <= message_index);
169 
170  /* First message should be a cache creation entry for our endpoint */
171  msg = sink->messages[message_index];
172  type = stasis_message_type(msg);
173  ast_test_validate(test, stasis_cache_update_type() == type);
174  update = stasis_message_data(msg);
175  ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
176  ast_test_validate(test, NULL == update->old_snapshot);
177  actual_snapshot = stasis_message_data(update->new_snapshot);
178  ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
179  ast_test_validate(test,
180  0 == strcmp(__func__, actual_snapshot->resource));
181 
183  uut = NULL;
184 
185  /* Note: there's a few messages between the creation and the clear.
186  * Wait for all of them... */
187  message_index = stasis_message_sink_wait_for(sink, message_index + 2,
189  ast_test_validate(test, 0 <= message_index);
190 
191  /* Now we should have a cache removal entry */
192  msg = sink->messages[message_index];
193  type = stasis_message_type(msg);
194  ast_test_validate(test, stasis_cache_update_type() == type);
195  update = stasis_message_data(msg);
196  ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
197  actual_snapshot = stasis_message_data(update->old_snapshot);
198  ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
199  ast_test_validate(test,
200  0 == strcmp(__func__, actual_snapshot->resource));
201  ast_test_validate(test, NULL == update->new_snapshot);
202 
203  return AST_TEST_PASS;
204 }
205 
206 AST_TEST_DEFINE(channel_messages)
207 {
209  RAII_VAR(struct ast_channel *, chan, NULL, ast_hangup);
210  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
212  struct stasis_message *msg;
213  struct stasis_message_type *type;
214  struct ast_endpoint_snapshot *actual_snapshot;
215  int actual_count;
216 
217  switch (cmd) {
218  case TEST_INIT:
219  info->name = __func__;
220  info->category = test_category;
221  info->summary = "Test channel messages on an endpoint topic";
222  info->description =
223  "Test channel messages on an endpoint topic";
224  return AST_TEST_NOT_RUN;
225  case TEST_EXECUTE:
226  break;
227  }
228 
229  uut = ast_endpoint_create("TEST", __func__);
230  ast_test_validate(test, NULL != uut);
231 
233  ast_test_validate(test, NULL != sink);
234 
236  stasis_message_sink_cb(), sink);
237  ast_test_validate(test, NULL != sub);
238 
239  chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", __func__, "100",
240  "100", "default", NULL, NULL, 0, "TEST/test_res");
241  ast_test_validate(test, NULL != chan);
242 
243  ast_endpoint_add_channel(uut, chan);
244 
245  actual_count = stasis_message_sink_wait_for_count(sink, 1,
247  ast_test_validate(test, 1 == actual_count);
248 
249  msg = sink->messages[0];
250  type = stasis_message_type(msg);
251  ast_test_validate(test, ast_endpoint_snapshot_type() == type);
252  actual_snapshot = stasis_message_data(msg);
253  ast_test_validate(test, 1 == actual_snapshot->num_channels);
254 
255  ast_hangup(chan);
256  chan = NULL;
257 
258  actual_count = stasis_message_sink_wait_for_count(sink, 3,
260  ast_test_validate(test, 3 == actual_count);
261 
262  msg = sink->messages[1];
263  type = stasis_message_type(msg);
264  ast_test_validate(test, ast_channel_snapshot_type() == type);
265 
266  msg = sink->messages[2];
267  type = stasis_message_type(msg);
268  ast_test_validate(test, ast_endpoint_snapshot_type() == type);
269 
270  actual_snapshot = stasis_message_data(msg);
271  ast_test_validate(test, 0 == actual_snapshot->num_channels);
272 
273  return AST_TEST_PASS;
274 }
275 
276 static int unload_module(void)
277 {
279  AST_TEST_UNREGISTER(cache_clear);
280  AST_TEST_UNREGISTER(channel_messages);
281  return 0;
282 }
283 
284 static int load_module(void)
285 {
287  AST_TEST_REGISTER(cache_clear);
288  AST_TEST_REGISTER(channel_messages);
290 }
291 
292 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT, "Endpoint stasis-related testing",
293  .support_level = AST_MODULE_SUPPORT_CORE,
294  .load = load_module,
295  .unload = unload_module,
296  .requires = "res_stasis_test",
297 );
static const char type[]
Definition: chan_ooh323.c:109
Structure that collects messages from a topic.
Definition: stasis_test.h:44
Main Channel structure associated with a channel.
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:971
int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
Wait for a message that matches the given criteria.
struct ast_endpoint * ast_endpoint_create(const char *tech, const char *resource)
Create an endpoint struct.
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
AST_TEST_DEFINE(state_changes)
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
Test Framework API.
static int load_module(void)
#define AST_TEST_REGISTER(cb)
Definition: test.h:127
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state)
Updates the state of the given endpoint.
int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Wait for a sink&#39;s num_messages field to reach a certain level.
Endpoint abstractions.
#define NULL
Definition: resample.c:96
struct stasis_message_sink * stasis_message_sink_create(void)
Create a message sink.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
static int unload_module(void)
General Asterisk PBX channel definitions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
const ast_string_field resource
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
stasis_subscription_cb stasis_message_sink_cb(void)
Topic callback to receive messages.
Cache update message.
Definition: stasis.h:967
int ast_endpoint_add_channel(struct ast_endpoint *endpoint, struct ast_channel *chan)
Adds a channel to the given endpoint.
A snapshot of an endpoint&#39;s state.
#define stasis_subscribe(topic, callback, data)
Definition: stasis.h:652
static int cache_update(struct stasis_message *msg, const void *data)
Message matcher looking for cache update messages.
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:973
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define AST_TEST_UNREGISTER(cb)
Definition: test.h:128
const ast_string_field tech
#define STASIS_SINK_DEFAULT_WAIT
Definition: stasis_test.h:41
def info(msg)
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:973
static const char * test_category
static const char name[]
Definition: cdr_mysql.c:74
void ast_hangup(struct ast_channel *chan)
Hang up a channel.
Definition: channel.c:2548
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS|AST_MODFLAG_LOAD_ORDER, "HTTP Phone Provisioning",.support_level=AST_MODULE_SUPPORT_EXTENDED,.load=load_module,.unload=unload_module,.reload=reload,.load_pri=AST_MODPRI_CHANNEL_DEPEND,.requires="http",)
void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, int max_channels)
Updates the maximum number of channels an endpoint supports.
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * sub
Definition: res_corosync.c:240
The state change queue. State changes are queued for processing by a separate thread.
Definition: devicestate.c:208
void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
Shutsdown an ast_endpoint.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
Definition: channel.h:1259
Asterisk module definitions.
Endpoint abstractions.
enum ast_endpoint_state state
Test infrastructure for dealing with Stasis.