Asterisk - The Open Source Telephony Project  18.5.0
test_stasis_state.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2019, Sangoma Technologies Corporation
5  *
6  * Kevin Harwell <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*** MODULEINFO
20  <depend>TEST_FRAMEWORK</depend>
21  <support_level>core</support_level>
22  ***/
23 
24 #include "asterisk.h"
25 
26 #include "asterisk/astobj2.h"
27 #include "asterisk/conversions.h"
28 #include "asterisk/module.h"
29 #include "asterisk/stasis_state.h"
30 #include "asterisk/test.h"
31 
32 #define test_category "/stasis/core/state/"
33 
34 #define TOPIC_COUNT 500
35 
36 #define MANAGER_TOPIC "foo"
37 
38 struct stasis_message_type *foo_type(void);
39 
40 /*! foo stasis message type */
42 
43 /*! foo_type data */
44 struct foo_data {
45  size_t bar;
46 };
47 
50 
51 /*!
52  * For testing purposes each subscribed state's id is a number. This value is
53  * the summation of all id's.
54  */
55 static size_t sum_total;
56 
57 /*! Test variable that tracks the running total of state ids */
58 static size_t running_total;
59 
60 /*! This value is set to check if state data is NULL before publishing */
61 static int expect_null;
62 
63 static int validate_data(const char *id, struct foo_data *foo)
64 {
65  size_t num;
66 
67  if (ast_str_to_umax(id, &num)) {
68  ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
69  return -1;
70  }
71 
72  running_total += num;
73 
74  if (!foo) {
75  if (expect_null) {
76  return 0;
77  }
78 
79  ast_log(LOG_ERROR, "Expected state data for '%s'\n", id);
80  return -1;
81  }
82 
83  if (expect_null) {
84  ast_log(LOG_ERROR, "Expected NULL state data for '%s'\n", id);
85  return -1;
86  }
87 
88  if (foo->bar != num) {
89  ast_log(LOG_ERROR, "Unexpected state data for '%s'\n", id);
90  return -1;
91  }
92 
93  return 0;
94 }
95 
96 static void handle_validate(const char *id, struct stasis_state_subscriber *sub)
97 {
98  struct foo_data *foo = stasis_state_subscriber_data(sub);
99  validate_data(id, foo);
100  ao2_cleanup(foo);
101 }
102 
105  .on_unsubscribe = handle_validate
106 };
107 
108 static void foo_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
109 {
110  /* No op since we are not really testing stasis topic handling here */
111 }
112 
113 static int subscriptions_destroy(struct stasis_state_manager *manager, struct subscriptions *subs)
114 {
116 
118  AST_VECTOR_FREE(subs);
119 
120  stasis_state_remove_observer(manager, &foo_observer);
121 
122  if (running_total != sum_total) {
123  ast_log(LOG_ERROR, "Failed to destroy all subscriptions: running=%zu, sum=%zu\n",
125  return -1;
126  }
127 
128  return 0;
129 }
130 
131 static int subscriptions_create(struct stasis_state_manager *manager,
132  struct subscriptions *subs)
133 {
134  size_t i;
135 
136  if (stasis_state_add_observer(manager, &foo_observer) ||
137  AST_VECTOR_INIT(subs, TOPIC_COUNT)) {
138  return -1;
139  }
140 
141  sum_total = running_total = 0;
142  expect_null = 1;
143 
144  for (i = 0; i < TOPIC_COUNT; ++i) {
146  char id[32];
147 
148  if (snprintf(id, 10, "%zu", i) == -1) {
149  ast_log(LOG_ERROR, "Unable to convert subscriber id to string\n");
150  break;
151  }
152 
153  sub = stasis_state_subscribe_pool(manager, id, foo_type_cb, NULL);
154  if (!sub) {
155  ast_log(LOG_ERROR, "Failed to create a state subscriber for id '%s'\n", id);
156  ao2_ref(sub, -1);
157  break;
158  }
159 
160  if (AST_VECTOR_APPEND(subs, sub)) {
161  ast_log(LOG_ERROR, "Failed to add to foo_sub to vector for id '%s'\n", id);
162  ao2_ref(sub, -1);
163  break;
164  }
165 
166  sum_total += i;
167  }
168 
169  if (i != TOPIC_COUNT || running_total != sum_total) {
170  ast_log(LOG_ERROR, "Failed to create all subscriptions: running=%zu, sum=%zu\n",
172  subscriptions_destroy(manager, subs);
173  return -1;
174  }
175 
176  return 0;
177 }
178 
179 static int publishers_destroy(struct stasis_state_manager *manager, struct publishers *pubs)
180 {
181  size_t i;
182 
183  if (pubs) {
184  /* Remove explicit publishers */
186  AST_VECTOR_FREE(pubs);
187  return 0;
188  }
189 
190  for (i = 0; i < TOPIC_COUNT; ++i) {
191  char id[32];
192 
193  /* Remove implicit publishers */
194  if (snprintf(id, 10, "%zu", i) == -1) {
195  ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
196  return -1;
197  }
198 
200  }
201 
202  return 0;
203 }
204 
205 static int publishers_create(struct stasis_state_manager *manager,
206  struct publishers *pubs)
207 {
208  size_t i;
209 
210  if (AST_VECTOR_INIT(pubs, TOPIC_COUNT)) {
211  return -1;
212  }
213 
214  for (i = 0; i < TOPIC_COUNT; ++i) {
215  struct stasis_state_publisher *pub;
216  char id[32];
217 
218  if (snprintf(id, 10, "%zu", i) == -1) {
219  ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
220  break;
221  }
222 
223  /* Create the state publisher */
224  pub = stasis_state_add_publisher(manager, id);
225  if (!pub) {
226  ast_log(LOG_ERROR, "Failed to create a state publisher for id '%s'\n", id);
227  break;
228  }
229 
230  if (AST_VECTOR_APPEND(pubs, pub)) {
231  ast_log(LOG_ERROR, "Failed to add to publisher to vector for id '%s'\n", id);
232  ao2_ref(pub, -1);
233  break;
234  }
235  }
236 
237  if (i != TOPIC_COUNT) {
238  ast_log(LOG_ERROR, "Failed to create all publishers: count=%zu\n", i);
239  publishers_destroy(manager, pubs);
240  return -1;
241  }
242 
243  return 0;
244 }
245 
246 static struct stasis_message *create_foo_type_message(const char *id)
247 {
248  struct stasis_message *msg;
249  struct foo_data *foo;
250 
251  foo = ao2_alloc(sizeof(*foo), NULL);
252  if (!foo) {
253  ast_log(LOG_ERROR, "Failed to allocate foo data for '%s'\n", id);
254  return NULL;
255  }
256 
257  if (ast_str_to_umax(id, &foo->bar)) {
258  ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
259  ao2_ref(foo, -1);
260  return NULL;
261  }
262 
264  if (!msg) {
265  ast_log(LOG_ERROR, "Failed to create stasis message for '%s'\n", id);
266  }
267 
268  ao2_ref(foo, -1);
269  return msg;
270 }
271 
272 static int implicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
273 {
274  /* For each state object create and publish new state data */
275  struct foo_data *foo = stasis_message_data(msg);
276 
277  if (validate_data(id, foo)) {
278  return CMP_STOP;
279  }
280 
281  msg = create_foo_type_message(id);
282  if (!msg) {
283  return CMP_STOP;
284  }
285 
286  /* Now publish it on the managed state object */
287  stasis_state_publish_by_id(user_data, id, NULL, msg);
288  ao2_ref(msg, -1);
289 
290  return 0;
291 }
292 
293 static int explicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
294 {
295  /* For each state object create and publish new state data */
296  struct publishers *pubs = user_data;
297  struct stasis_state_publisher *pub = NULL;
298  struct foo_data *foo = stasis_message_data(msg);
299  size_t i;
300 
301  if (validate_data(id, foo)) {
302  return CMP_STOP;
303  }
304 
305  msg = create_foo_type_message(id);
306  if (!msg) {
307  return CMP_STOP;
308  }
309 
310  for (i = 0; i < AST_VECTOR_SIZE(pubs); ++i) {
311  if (!strcmp(stasis_state_publisher_id(AST_VECTOR_GET(pubs, i)), id)) {
312  pub = AST_VECTOR_GET(pubs, i);
313  break;
314  }
315  }
316 
317  if (!pub) {
318  ast_log(LOG_ERROR, "Unable to locate publisher for id '%s'\n", id);
319  return CMP_STOP;
320  }
321 
322  stasis_state_publish(pub, msg);
323  ao2_ref(msg, -1);
324 
325  return 0;
326 }
327 
328 static int publish(struct stasis_state_manager *manager, on_stasis_state cb,
329  void *user_data)
330 {
331  /* First time there is no state data */
332  expect_null = 1;
333 
334  running_total = 0;
335  stasis_state_callback_all(manager, cb, user_data);
336 
337  if (running_total != sum_total) {
338  ast_log(LOG_ERROR, "Failed manager_callback (1): running=%zu, sum=%zu\n",
340  return -1;
341  }
342 
343  /* Second time check valid state data exists */
345  stasis_state_callback_all(manager, cb, user_data);
346 
347  if (running_total != sum_total) {
348  ast_log(LOG_ERROR, "Failed manager_callback (2): running=%zu, sum=%zu\n",
350  return -1;
351  }
352 
353  return 0;
354 }
355 
356 AST_TEST_DEFINE(implicit_publish)
357 {
358  RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
359  struct subscriptions subs;
360  int rc = AST_TEST_PASS;
361 
362  switch (cmd) {
363  case TEST_INIT:
364  info->name = __func__;
365  info->category = test_category;
366  info->summary = "Test implicit publishing of stasis state";
367  info->description = info->summary;
368  return AST_TEST_NOT_RUN;
369  case TEST_EXECUTE:
370  break;
371  }
372 
374  ast_test_validate(test, manager != NULL);
375 
376  ast_test_validate(test, !subscriptions_create(manager, &subs));
377 
378  ast_test_validate_cleanup(test, !publish(manager, implicit_publish_cb, manager),
379  rc, cleanup);
380 
381 cleanup:
382  if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, NULL)) {
383  return AST_TEST_FAIL;
384  }
385 
386  /*
387  * State subscriptions add a ref a state. The state in turn adds a ref
388  * to the manager. So if more than one ref is held on the manager before
389  * exiting, there is a ref leak some place.
390  */
391  if (ao2_ref(manager, 0) != 1) {
392  ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
393  return AST_TEST_FAIL;
394  }
395 
396  return rc;
397 }
398 
399 AST_TEST_DEFINE(explicit_publish)
400 {
401  RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
402  struct subscriptions subs;
403  struct publishers pubs;
404  int rc = AST_TEST_PASS;
405 
406  switch (cmd) {
407  case TEST_INIT:
408  info->name = __func__;
409  info->category = test_category;
410  info->summary = "Test explicit publishing of stasis state";
411  info->description = info->summary;
412  return AST_TEST_NOT_RUN;
413  case TEST_EXECUTE:
414  break;
415  }
416 
418  ast_test_validate(test, manager != NULL);
419 
420  ast_test_validate(test, !subscriptions_create(manager, &subs));
421  ast_test_validate_cleanup(test, !publishers_create(manager, &pubs), rc, cleanup);
422 
423  ast_test_validate_cleanup(test, !publish(manager, explicit_publish_cb, &pubs),
424  rc, cleanup);
425 
426 cleanup:
427  if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, &pubs)) {
428  return AST_TEST_FAIL;
429  }
430 
431  /*
432  * State subscriptions add a ref a state. The state in turn adds a ref
433  * to the manager. So if more than one ref is held on the manager before
434  * exiting, there is a ref leak some place.
435  */
436  if (ao2_ref(manager, 0) != 1) {
437  ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
438  return AST_TEST_FAIL;
439  }
440 
441  return rc;
442 }
443 
444 static int unload_module(void)
445 {
446  AST_TEST_UNREGISTER(implicit_publish);
447  AST_TEST_UNREGISTER(explicit_publish);
448 
450 
451  return 0;
452 }
453 
454 static int load_module(void)
455 {
457  return -1;
458  }
459 
460  AST_TEST_REGISTER(implicit_publish);
461  AST_TEST_REGISTER(explicit_publish);
462 
464 }
465 
Managed stasis state event interface.
Definition: stasis_state.h:463
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
#define AST_MODULE_INFO_STANDARD(keystr, desc)
Definition: module.h:567
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
Definition: stasis_state.c:446
static struct ao2_container * publishers
Container of active outbound extension state publishers.
STASIS_MESSAGE_TYPE_DEFN(foo_type)
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
Definition: stasis_state.c:658
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
Definition: stasis_state.c:700
int ast_str_to_umax(const char *str, uintmax_t *res)
Convert the given string to an unsigned max size integer.
Definition: conversions.c:119
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
static void foo_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
void(* on_subscribe)(const char *id, struct stasis_state_subscriber *sub)
Raised when any managed state is being subscribed.
Definition: stasis_state.h:470
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher&#39;s underlying state&#39;s unique id.
Definition: stasis_state.c:552
static int publish(struct stasis_state_manager *manager, on_stasis_state cb, void *user_data)
AST_TEST_DEFINE(implicit_publish)
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Definition: stasis_state.c:497
Test Framework API.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
#define AST_TEST_REGISTER(cb)
Definition: test.h:127
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
Definition: stasis_state.h:521
static int subscriptions_create(struct stasis_state_manager *manager, struct subscriptions *subs)
struct stasis_state_observer foo_observer
#define NULL
Definition: resample.c:96
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
Definition: stasis_state.c:477
static int publishers_create(struct stasis_state_manager *manager, struct publishers *pubs)
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
Definition: stasis_state.c:324
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
Definition: stasis_state.c:688
static size_t running_total
static int subscriptions_destroy(struct stasis_state_manager *manager, struct subscriptions *subs)
static void handle_validate(const char *id, struct stasis_state_subscriber *sub)
static int publishers_destroy(struct stasis_state_manager *manager, struct publishers *pubs)
static int validate_data(const char *id, struct foo_data *foo)
struct stasis_message_type * foo_type(void)
#define ast_log
Definition: astobj2.c:42
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
Definition: stasis_state.c:740
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
Definition: stasis_state.c:531
static int expect_null
Conversion utility functions.
Stasis State API.
static int load_module(void)
#define TOPIC_COUNT
#define LOG_ERROR
Definition: logger.h:285
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define AST_TEST_UNREGISTER(cb)
Definition: test.h:128
def info(msg)
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
Definition: stasis_state.c:638
static int implicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
AST_VECTOR(subscriptions, struct stasis_state_subscriber *)
static int explicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
static struct stasis_message * create_foo_type_message(const char *id)
#define MANAGER_TOPIC
static int unload_module(void)
static void * cleanup(void *unused)
Definition: pbx_realtime.c:124
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
static size_t sum_total
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct stasis_forward * sub
Definition: res_corosync.c:240
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
#define test_category
Asterisk module definitions.
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
Definition: stasis_state.c:562
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:865