Asterisk - The Open Source Telephony Project  18.5.0
res/stasis/app.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <[email protected]>
24  */
25 
26 #include "asterisk.h"
27 
28 #include "app.h"
29 #include "control.h"
30 #include "messaging.h"
31 
32 #include "asterisk/callerid.h"
33 #include "asterisk/cli.h"
34 #include "asterisk/stasis_app.h"
39 
40 #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
41 #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
42 #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
43 
44 /*! Global debug flag. No need for locking */
46 
47 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
48 
49 struct stasis_app {
50  /*! Aggregation topic for this application. */
52  /*! Router for handling messages forwarded to \a topic. */
54  /*! Router for handling messages to the bridge all \a topic. */
56  /*! Optional router for handling endpoint messages in 'all' subscriptions */
58  /*! Container of the channel forwards to this app's topic. */
60  /*! Callback function for this application. */
62  /*! Opaque data to hand to callback function. */
63  void *data;
64  /*! Subscription model for the application */
66  /*! Whether or not someone wants to see debug messages about this app */
67  int debug;
68  /*! An array of allowed events types for this application */
70  /*! An array of disallowed events types for this application */
72  /*! Name of the Stasis application */
73  char name[];
74 };
75 
80 };
81 
82 /*! Subscription info for a particular channel/bridge. */
83 struct app_forwards {
84  /*! Count of number of times this channel/bridge has been subscribed */
86 
87  /*! Forward for the regular topic */
89  /*! Forward for the caching topic */
91 
92  /* Type of object being forwarded */
94  /*! Unique id of the object being forwarded */
95  char id[];
96 };
97 
98 static void forwards_dtor(void *obj)
99 {
100 #ifdef AST_DEVMODE
101  struct app_forwards *forwards = obj;
102 #endif /* AST_DEVMODE */
103 
104  ast_assert(forwards->topic_forward == NULL);
105  ast_assert(forwards->topic_cached_forward == NULL);
106 }
107 
109 {
111  forwards->topic_forward = NULL;
113  forwards->topic_cached_forward = NULL;
114 }
115 
117  const char *id)
118 {
119  struct app_forwards *forwards;
120 
121  if (!app || ast_strlen_zero(id)) {
122  return NULL;
123  }
124 
125  forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
126  if (!forwards) {
127  return NULL;
128  }
129 
130  strcpy(forwards->id, id); /* SAFE */
131 
132  return forwards;
133 }
134 
135 /*! Forward a channel's topics to an app */
137  struct ast_channel *chan)
138 {
139  struct app_forwards *forwards;
140 
141  if (!app) {
142  return NULL;
143  }
144 
145  forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
146  if (!forwards) {
147  return NULL;
148  }
149 
150  forwards->forward_type = FORWARD_CHANNEL;
151  forwards->topic_forward = stasis_forward_all(
152  chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
153  app->topic);
154 
155  if (!forwards->topic_forward) {
156  ao2_ref(forwards, -1);
157  return NULL;
158  }
159 
160  return forwards;
161 }
162 
163 /*! Forward a bridge's topics to an app */
165  struct ast_bridge *bridge)
166 {
167  struct app_forwards *forwards;
168 
169  if (!app) {
170  return NULL;
171  }
172 
173  forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
174  if (!forwards) {
175  return NULL;
176  }
177 
178  forwards->forward_type = FORWARD_BRIDGE;
179  forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), app->topic);
180 
181  if (!forwards->topic_forward && bridge) {
182  forwards_unsubscribe(forwards);
183  ao2_ref(forwards, -1);
184  return NULL;
185  }
186 
187  return forwards;
188 }
189 
190 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
191  struct stasis_message *message)
192 {
193  struct stasis_app *app = data;
194 
195  stasis_publish(app->topic, message);
196 }
197 
198 /*! Forward a endpoint's topics to an app */
200  struct ast_endpoint *endpoint)
201 {
202  struct app_forwards *forwards;
203  int ret = 0;
204 
205  if (!app) {
206  return NULL;
207  }
208 
209  forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
210  if (!forwards) {
211  return NULL;
212  }
213 
214  forwards->forward_type = FORWARD_ENDPOINT;
215  if (endpoint) {
217  app->topic);
219  ast_endpoint_topic_cached(endpoint), app->topic);
220 
221  if (!forwards->topic_forward || !forwards->topic_cached_forward) {
222  /* Half-subscribed is a bad thing */
223  forwards_unsubscribe(forwards);
224  ao2_ref(forwards, -1);
225  return NULL;
226  }
227  } else {
228  /* Since endpoint subscriptions also subscribe to channels, in the case
229  * of all endpoint subscriptions, we only want messages for the endpoints.
230  * As such, we route those particular messages and then re-publish them
231  * on the app's topic.
232  */
235  if (!app->endpoint_router) {
236  forwards_unsubscribe(forwards);
237  ao2_ref(forwards, -1);
238  return NULL;
239  }
240 
245 
246  if (ret) {
247  ao2_ref(app->endpoint_router, -1);
248  app->endpoint_router = NULL;
249  ao2_ref(forwards, -1);
250  return NULL;
251  }
252  }
253 
254  return forwards;
255 }
256 
257 static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
258 {
259  const struct app_forwards *object_left = obj_left;
260  const struct app_forwards *object_right = obj_right;
261  const char *right_key = obj_right;
262  int cmp;
263 
264  switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
265  case OBJ_POINTER:
266  right_key = object_right->id;
267  /* Fall through */
268  case OBJ_KEY:
269  cmp = strcmp(object_left->id, right_key);
270  break;
271  case OBJ_PARTIAL_KEY:
272  /*
273  * We could also use a partial key struct containing a length
274  * so strlen() does not get called for every comparison instead.
275  */
276  cmp = strncmp(object_left->id, right_key, strlen(right_key));
277  break;
278  default:
279  /* Sort can only work on something with a full or partial key. */
280  ast_assert(0);
281  cmp = 0;
282  break;
283  }
284  return cmp;
285 }
286 
287 static void app_dtor(void *obj)
288 {
289  struct stasis_app *app = obj;
290  size_t size = strlen("stasis-") + strlen(app->name) + 1;
291  char context_name[size];
292 
293  ast_verb(1, "Destroying Stasis app %s\n", app->name);
294 
295  ast_assert(app->router == NULL);
296  ast_assert(app->bridge_router == NULL);
298 
299  /* If we created a context for this application, remove it */
300  strcpy(context_name, "stasis-");
301  strcat(context_name, app->name);
302  ast_context_destroy_by_name(context_name, "res_stasis");
303 
304  ao2_cleanup(app->topic);
305  app->topic = NULL;
306  ao2_cleanup(app->forwards);
307  app->forwards = NULL;
308  ao2_cleanup(app->data);
309  app->data = NULL;
310 
312  app->events_allowed = NULL;
314  app->events_disallowed = NULL;
315 
316 }
317 
319 {
320  struct ast_multi_channel_blob *payload = stasis_message_data(message);
321  struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
322  struct ast_channel *chan;
323 
324  if (!snapshot) {
325  return;
326  }
327 
328  chan = ast_channel_get_by_name(snapshot->base->uniqueid);
329  if (!chan) {
330  return;
331  }
332 
333  app_subscribe_channel(app, chan);
334  ast_channel_unref(chan);
335 }
336 
338  struct stasis_message *message)
339 {
340  struct stasis_app *app = data;
341 
342  if (stasis_subscription_final_message(sub, message)) {
343  ao2_cleanup(app);
344  }
345 }
346 
347 static void sub_default_handler(void *data, struct stasis_subscription *sub,
348  struct stasis_message *message)
349 {
350  struct stasis_app *app = data;
351  struct ast_json *json;
352 
353  /* The dial type can be converted to JSON so it will always be passed
354  * here.
355  */
356  if (stasis_message_type(message) == ast_channel_dial_type()) {
357  call_forwarded_handler(app, message);
358  }
359 
360  /* By default, send any message that has a JSON representation */
362  if (!json) {
363  return;
364  }
365 
366  app_send(app, json);
367  ast_json_unref(json);
368 }
369 
370 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
371 typedef struct ast_json *(*channel_snapshot_monitor)(
372  struct ast_channel_snapshot *old_snapshot,
373  struct ast_channel_snapshot *new_snapshot,
374  const struct timeval *tv);
375 
377  const char *type,
378  struct ast_channel_snapshot *snapshot,
379  const struct timeval *tv)
380 {
381  struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
382 
383  if (!json_channel) {
384  return NULL;
385  }
386 
387  return ast_json_pack("{s: s, s: o, s: o}",
388  "type", type,
389  "timestamp", ast_json_timeval(*tv, NULL),
390  "channel", json_channel);
391 }
392 
394  struct ast_channel_snapshot *snapshot,
395  const struct timeval *tv)
396 {
397  return simple_channel_event("ChannelCreated", snapshot, tv);
398 }
399 
401  struct ast_channel_snapshot *snapshot,
402  const struct timeval *tv)
403 {
404  struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
405 
406  if (!json_channel) {
407  return NULL;
408  }
409 
410  return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
411  "type", "ChannelDestroyed",
412  "timestamp", ast_json_timeval(*tv, NULL),
413  "cause", snapshot->hangup->cause,
414  "cause_txt", ast_cause2str(snapshot->hangup->cause),
415  "channel", json_channel);
416 }
417 
419  struct ast_channel_snapshot *snapshot,
420  const struct timeval *tv)
421 {
422  return simple_channel_event("ChannelStateChange", snapshot, tv);
423 }
424 
425 /*! \brief Handle channel state changes */
426 static struct ast_json *channel_state(
427  struct ast_channel_snapshot *old_snapshot,
428  struct ast_channel_snapshot *new_snapshot,
429  const struct timeval *tv)
430 {
431  struct ast_channel_snapshot *snapshot = new_snapshot ?
432  new_snapshot : old_snapshot;
433 
434  if (!old_snapshot) {
435  return channel_created_event(snapshot, tv);
436  } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
437  return channel_destroyed_event(snapshot, tv);
438  } else if (old_snapshot->state != new_snapshot->state) {
439  return channel_state_change_event(snapshot, tv);
440  }
441 
442  return NULL;
443 }
444 
445 static struct ast_json *channel_dialplan(
446  struct ast_channel_snapshot *old_snapshot,
447  struct ast_channel_snapshot *new_snapshot,
448  const struct timeval *tv)
449 {
450  struct ast_json *json_channel;
451 
452  /* No Newexten event on first channel snapshot */
453  if (!old_snapshot) {
454  return NULL;
455  }
456 
457  /* Empty application is not valid for a Newexten event */
458  if (ast_strlen_zero(new_snapshot->dialplan->appl)) {
459  return NULL;
460  }
461 
462  if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
463  return NULL;
464  }
465 
466  json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
467  if (!json_channel) {
468  return NULL;
469  }
470 
471  return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
472  "type", "ChannelDialplan",
473  "timestamp", ast_json_timeval(*tv, NULL),
474  "dialplan_app", new_snapshot->dialplan->appl,
475  "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->dialplan->data),
476  "channel", json_channel);
477 }
478 
479 static struct ast_json *channel_callerid(
480  struct ast_channel_snapshot *old_snapshot,
481  struct ast_channel_snapshot *new_snapshot,
482  const struct timeval *tv)
483 {
484  struct ast_json *json_channel;
485 
486  /* No NewCallerid event on first channel snapshot */
487  if (!old_snapshot) {
488  return NULL;
489  }
490 
491  if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
492  return NULL;
493  }
494 
495  json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
496  if (!json_channel) {
497  return NULL;
498  }
499 
500  return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
501  "type", "ChannelCallerId",
502  "timestamp", ast_json_timeval(*tv, NULL),
503  "caller_presentation", new_snapshot->caller->pres,
504  "caller_presentation_txt", ast_describe_caller_presentation(
505  new_snapshot->caller->pres),
506  "channel", json_channel);
507 }
508 
510  struct ast_channel_snapshot *old_snapshot,
511  struct ast_channel_snapshot *new_snapshot,
512  const struct timeval *tv)
513 {
514  struct ast_json *json_channel;
515 
516  /* No ChannelConnectedLine event on first channel snapshot */
517  if (!old_snapshot) {
518  return NULL;
519  }
520 
521  if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
522  return NULL;
523  }
524 
525  json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
526  if (!json_channel) {
527  return NULL;
528  }
529 
530  return ast_json_pack("{s: s, s: o, s: o}",
531  "type", "ChannelConnectedLine",
532  "timestamp", ast_json_timeval(*tv, NULL),
533  "channel", json_channel);
534 }
535 
541 };
542 
544  struct stasis_subscription *sub,
545  struct stasis_message *message)
546 {
547  struct stasis_app *app = data;
549  int i;
550 
551  for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
552  struct ast_json *msg;
553 
554  msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
555  stasis_message_timestamp(message));
556  if (msg) {
557  app_send(app, msg);
558  ast_json_unref(msg);
559  }
560  }
561 
562  if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
563  unsubscribe(app, "channel", update->new_snapshot->base->uniqueid, 1);
564  }
565 }
566 
568  const char *type,
569  struct ast_endpoint_snapshot *snapshot,
570  const struct timeval *tv)
571 {
572  struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
573 
574  if (!json_endpoint) {
575  return NULL;
576  }
577 
578  return ast_json_pack("{s: s, s: o, s: o}",
579  "type", type,
580  "timestamp", ast_json_timeval(*tv, NULL),
581  "endpoint", json_endpoint);
582 }
583 
584 static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
585 {
586  struct ast_endpoint_snapshot *snapshot;
587  struct ast_json *json_endpoint;
588  struct ast_json *message;
589  struct stasis_app *app = pvt;
590  char *tech;
591  char *resource;
592 
593  tech = ast_strdupa(endpoint_id);
594  resource = strchr(tech, '/');
595  if (resource) {
596  resource[0] = '\0';
597  resource++;
598  }
599 
600  if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
601  return -1;
602  }
603 
604  snapshot = ast_endpoint_latest_snapshot(tech, resource);
605  if (!snapshot) {
606  return -1;
607  }
608 
609  json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
610  ao2_ref(snapshot, -1);
611  if (!json_endpoint) {
612  return -1;
613  }
614 
615  message = ast_json_pack("{s: s, s: o, s: o, s: o}",
616  "type", "TextMessageReceived",
617  "timestamp", ast_json_timeval(ast_tvnow(), NULL),
618  "endpoint", json_endpoint,
619  "message", ast_json_ref(json_msg));
620  if (message) {
621  app_send(app, message);
622  ast_json_unref(message);
623  }
624 
625  return 0;
626 }
627 
629  struct stasis_subscription *sub,
630  struct stasis_message *message)
631 {
632  struct stasis_app *app = data;
633  struct stasis_cache_update *update;
634  struct ast_endpoint_snapshot *new_snapshot;
635  struct ast_endpoint_snapshot *old_snapshot;
636  const struct timeval *tv;
637 
639 
640  update = stasis_message_data(message);
641 
643 
644  new_snapshot = stasis_message_data(update->new_snapshot);
645  old_snapshot = stasis_message_data(update->old_snapshot);
646 
647  if (new_snapshot) {
648  struct ast_json *json;
649 
651 
652  json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
653  if (!json) {
654  return;
655  }
656 
657  app_send(app, json);
658  ast_json_unref(json);
659  }
660 
661  if (!new_snapshot && old_snapshot) {
662  unsubscribe(app, "endpoint", old_snapshot->id, 1);
663  }
664 }
665 
667  const char *type,
668  struct ast_bridge_snapshot *snapshot,
669  const struct timeval *tv)
670 {
671  struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
672  if (!json_bridge) {
673  return NULL;
674  }
675 
676  return ast_json_pack("{s: s, s: o, s: o}",
677  "type", type,
678  "timestamp", ast_json_timeval(*tv, NULL),
679  "bridge", json_bridge);
680 }
681 
682 static void sub_bridge_update_handler(void *data,
683  struct stasis_subscription *sub,
684  struct stasis_message *message)
685 {
686  struct ast_json *json = NULL;
687  struct stasis_app *app = data;
689  const struct timeval *tv;
690 
691  update = stasis_message_data(message);
692 
693  tv = stasis_message_timestamp(message);
694 
695  if (!update->new_snapshot) {
696  json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
697  } else if (!update->old_snapshot) {
698  json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
699  } else if (update->new_snapshot && update->old_snapshot
700  && strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
701  json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
702  if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
703  ast_json_object_set(json, "old_video_source_id",
705  }
706  }
707 
708  if (json) {
709  app_send(app, json);
710  ast_json_unref(json);
711  }
712 
713  if (!update->new_snapshot && update->old_snapshot) {
714  unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
715  }
716 }
717 
718 
719 /*! \brief Helper function for determining if the application is subscribed to a given entity */
720 static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
721 {
722  struct app_forwards *forwards = NULL;
723 
724  forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
725  if (!forwards) {
726  return 0;
727  }
728 
729  ao2_ref(forwards, -1);
730  return 1;
731 }
732 
734  struct stasis_message *message)
735 {
736  struct stasis_app *app = data;
737  struct ast_bridge_merge_message *merge;
738 
739  merge = stasis_message_data(message);
740 
741  /* Find out if we're subscribed to either bridge */
742  if (bridge_app_subscribed(app, merge->from->uniqueid) ||
743  bridge_app_subscribed(app, merge->to->uniqueid)) {
744  /* Forward the message to the app */
745  stasis_publish(app->topic, message);
746  }
747 }
748 
749 /*! \brief Callback function for checking if channels in a bridge are subscribed to */
751 {
752  int subscribed = 0;
753  struct ao2_iterator iter;
754  char *uniqueid;
755 
756  if (bridge_app_subscribed(app, snapshot->uniqueid)) {
757  return 1;
758  }
759 
760  iter = ao2_iterator_init(snapshot->channels, 0);
761  for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
762  if (bridge_app_subscribed(app, uniqueid)) {
763  subscribed = 1;
764  ao2_ref(uniqueid, -1);
765  break;
766  }
767  }
768  ao2_iterator_destroy(&iter);
769 
770  return subscribed;
771 }
772 
774  struct stasis_message *message)
775 {
776  struct stasis_app *app = data;
777  struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
778  struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
779 
780  if (bridge_app_subscribed(app, transfer_msg->transferer->base->uniqueid) ||
781  (bridge && bridge_app_subscribed_involved(app, bridge))) {
782  stasis_publish(app->topic, message);
783  }
784 }
785 
787  struct stasis_message *message)
788 {
789  struct stasis_app *app = data;
790  struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
791  int subscribed = 0;
792 
793  subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->base->uniqueid);
794  if (!subscribed) {
795  subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->base->uniqueid);
796  }
797  if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
798  subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
799  }
800  if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
801  subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
802  }
803 
804  if (!subscribed) {
805  switch (transfer_msg->dest_type) {
807  subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
808  break;
810  subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->base->uniqueid);
811  if (!subscribed) {
812  subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->base->uniqueid);
813  }
814  break;
815  break;
817  subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
818  if (!subscribed) {
819  subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->base->uniqueid);
820  }
821  break;
822  default:
823  break;
824  }
825  }
826 
827  if (subscribed) {
828  stasis_publish(app->topic, message);
829  }
830 }
831 
833  struct stasis_message *message)
834 {
835  struct stasis_app *app = data;
836 
837  if (stasis_subscription_final_message(sub, message)) {
838  ao2_cleanup(app);
839  }
840 }
841 
843 {
844  if (!app) {
845  return;
846  }
847 
848  app->debug = debug;
849 }
850 
852 {
853  struct stasis_app *app = stasis_app_get_by_name(app_name);
854 
855  if (!app) {
856  return;
857  }
858 
859  app->debug = debug;
860  ao2_cleanup(app);
861 }
862 
864 {
865  return (app ? app->debug : 0) || global_debug;
866 }
867 
869 {
870  int debug_enabled = 0;
871 
872  if (global_debug) {
873  debug_enabled = 1;
874  } else {
875  struct stasis_app *app = stasis_app_get_by_name(app_name);
876 
877  if (app) {
878  if (app->debug) {
879  debug_enabled = 1;
880  }
881  ao2_ref(app, -1);
882  }
883  }
884  return debug_enabled;
885 }
886 
888 {
890  if (!global_debug) {
891  struct ao2_container *app_names = stasis_app_get_all();
892  struct ao2_iterator it_app_names;
893  char *app_name;
894  struct stasis_app *app;
895 
896  if (!app_names || !ao2_container_count(app_names)) {
897  ao2_cleanup(app_names);
898  return;
899  }
900 
901  it_app_names = ao2_iterator_init(app_names, 0);
902  while ((app_name = ao2_iterator_next(&it_app_names))) {
903  if ((app = stasis_app_get_by_name(app_name))) {
904  stasis_app_set_debug(app, 0);
905  }
906 
907  ao2_cleanup(app_name);
908  ao2_cleanup(app);
909  }
910  ao2_iterator_cleanup(&it_app_names);
911  ao2_cleanup(app_names);
912  }
913 }
914 
916 {
917  RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
918  size_t size;
919  int res = 0;
920  size_t context_size = strlen("stasis-") + strlen(name) + 1;
921  char context_name[context_size];
922  char *topic_name;
923  int ret;
924 
925  ast_assert(name != NULL);
926  ast_assert(handler != NULL);
927 
928  ast_verb(1, "Creating Stasis app '%s'\n", name);
929 
930  size = sizeof(*app) + strlen(name) + 1;
932  if (!app) {
933  return NULL;
934  }
935  app->subscription_model = subscription_model;
936 
940  if (!app->forwards) {
941  return NULL;
942  }
943 
944  ret = ast_asprintf(&topic_name, "ari:application/%s", name);
945  if (ret < 0) {
946  return NULL;
947  }
948 
949  app->topic = stasis_topic_create(topic_name);
950  ast_free(topic_name);
951  if (!app->topic) {
952  return NULL;
953  }
954 
956  if (!app->bridge_router) {
957  return NULL;
958  }
959 
960  res |= stasis_message_router_add(app->bridge_router,
962 
963  res |= stasis_message_router_add(app->bridge_router,
965 
966  res |= stasis_message_router_add(app->bridge_router,
968 
969  res |= stasis_message_router_add(app->bridge_router,
971 
972  if (res != 0) {
973  return NULL;
974  }
975  /* Bridge router holds a reference */
976  ao2_ref(app, +1);
977 
978  app->router = stasis_message_router_create(app->topic);
979  if (!app->router) {
980  return NULL;
981  }
982 
983  res |= stasis_message_router_add(app->router,
985 
986  res |= stasis_message_router_add(app->router,
988 
991 
992  res |= stasis_message_router_add(app->router,
994 
997 
998  if (res != 0) {
999  return NULL;
1000  }
1001  /* Router holds a reference */
1002  ao2_ref(app, +1);
1003 
1004  strncpy(app->name, name, size - sizeof(*app));
1005  app->handler = handler;
1006  app->data = ao2_bump(data);
1007 
1008  /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
1009  * this should only be done if a context does not already exist. */
1010  strcpy(context_name, "stasis-");
1011  strcat(context_name, name);
1012  if (!ast_context_find(context_name)) {
1013  if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
1014  ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
1015  } else {
1016  ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
1017  ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
1018  }
1019  } else {
1020  ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
1021  context_name, name);
1022  }
1023 
1024  ao2_ref(app, +1);
1025  return app;
1026 }
1027 
1029 {
1030  return app->topic;
1031 }
1032 
1033 /*!
1034  * \brief Send a message to the given application.
1035  * \param app App to send the message to.
1036  * \param message Message to send.
1037  */
1038 void app_send(struct stasis_app *app, struct ast_json *message)
1039 {
1041  char eid[20];
1042  void *data;
1043 
1044  if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
1045  ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1046  ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1047  ast_json_string_get(ast_json_object_get(message, "type")));
1048  }
1049 
1050  /* Copy off mutable state with lock held */
1051  ao2_lock(app);
1052  handler = app->handler;
1053  data = ao2_bump(app->data);
1054  ao2_unlock(app);
1055  /* Name is immutable; no need to copy */
1056 
1057  if (handler) {
1058  handler(data, app->name, message);
1059  } else {
1060  ast_verb(3,
1061  "Inactive Stasis app '%s' missed message\n", app->name);
1062  }
1063  ao2_cleanup(data);
1064 }
1065 
1067 {
1068  ao2_lock(app);
1069 
1070  ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1071  app->handler = NULL;
1072  ao2_cleanup(app->data);
1073  app->data = NULL;
1074 
1075  ao2_unlock(app);
1076 }
1077 
1079 {
1080  ao2_lock(app);
1081 
1083 
1085  app->router = NULL;
1087  app->bridge_router = NULL;
1089  app->endpoint_router = NULL;
1090 
1091  ao2_unlock(app);
1092 }
1093 
1095 {
1096  int ret;
1097 
1098  ao2_lock(app);
1099  ret = app->handler != NULL;
1100  ao2_unlock(app);
1101 
1102  return ret;
1103 }
1104 
1106 {
1107  int ret;
1108 
1109  ao2_lock(app);
1110  ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1111  ao2_unlock(app);
1112 
1113  return ret;
1114 }
1115 
1117 {
1118  ao2_lock(app);
1119  if (app->handler && app->data) {
1120  struct ast_json *msg;
1121 
1122  ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1123 
1124  msg = ast_json_pack("{s: s, s: o?, s: s}",
1125  "type", "ApplicationReplaced",
1126  "timestamp", ast_json_timeval(ast_tvnow(), NULL),
1127  "application", app->name);
1128  if (msg) {
1129  /*
1130  * The app must be unlocked before calling 'send' since a handler may
1131  * subsequently attempt to grab the app lock after first obtaining a
1132  * lock for another object, thus causing a deadlock.
1133  */
1134  ao2_unlock(app);
1135  app_send(app, msg);
1136  ao2_lock(app);
1137  ast_json_unref(msg);
1138  if (!app->handler) {
1139  /*
1140  * If the handler disappeared then the app was deactivated. In that
1141  * case don't replace. Re-activation will reset the handler later.
1142  */
1143  ao2_unlock(app);
1144  return;
1145  }
1146  }
1147  } else {
1148  ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1149  }
1150 
1151  app->handler = handler;
1152  ao2_replace(app->data, data);
1153  ao2_unlock(app);
1154 }
1155 
1156 const char *stasis_app_name(const struct stasis_app *app)
1157 {
1158  return app->name;
1159 }
1160 
1161 static int forwards_filter_by_type(void *obj, void *arg, int flags)
1162 {
1163  struct app_forwards *forward = obj;
1164  enum forward_type *forward_type = arg;
1165 
1166  if (forward->forward_type == *forward_type) {
1167  return CMP_MATCH;
1168  }
1169 
1170  return 0;
1171 }
1172 
1173 void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
1174 {
1175  struct ao2_iterator *channels;
1176  struct ao2_iterator *endpoints;
1177  struct ao2_iterator *bridges;
1178  struct app_forwards *forward;
1180 
1181  ast_cli(a->fd, "Name: %s\n"
1182  " Debug: %s\n"
1183  " Subscription Model: %s\n",
1184  app->name,
1185  app->debug ? "Yes" : "No",
1187  "Global Resource Subscription" :
1188  "Application/Explicit Resource Subscription");
1189  ast_cli(a->fd, " Subscriptions: %d\n", ao2_container_count(app->forwards));
1190 
1191  ast_cli(a->fd, " Channels:\n");
1192  forward_type = FORWARD_CHANNEL;
1193  channels = ao2_callback(app->forwards, OBJ_MULTIPLE,
1194  forwards_filter_by_type, &forward_type);
1195  if (channels) {
1196  while ((forward = ao2_iterator_next(channels))) {
1197  ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1198  ao2_ref(forward, -1);
1199  }
1200  ao2_iterator_destroy(channels);
1201  }
1202 
1203  ast_cli(a->fd, " Bridges:\n");
1204  forward_type = FORWARD_BRIDGE;
1205  bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
1206  forwards_filter_by_type, &forward_type);
1207  if (bridges) {
1208  while ((forward = ao2_iterator_next(bridges))) {
1209  ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1210  ao2_ref(forward, -1);
1211  }
1212  ao2_iterator_destroy(bridges);
1213  }
1214 
1215  ast_cli(a->fd, " Endpoints:\n");
1216  forward_type = FORWARD_ENDPOINT;
1217  endpoints = ao2_callback(app->forwards, OBJ_MULTIPLE,
1218  forwards_filter_by_type, &forward_type);
1219  if (endpoints) {
1220  while ((forward = ao2_iterator_next(endpoints))) {
1221  ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
1222  ao2_ref(forward, -1);
1223  }
1224  ao2_iterator_destroy(endpoints);
1225  }
1226 }
1227 
1228 struct ast_json *app_to_json(const struct stasis_app *app)
1229 {
1230  struct ast_json *json;
1231  struct ast_json *channels;
1232  struct ast_json *bridges;
1233  struct ast_json *endpoints;
1234  struct ao2_iterator i;
1235  struct app_forwards *forwards;
1236 
1237  json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1238  "name", app->name,
1239  "channel_ids", "bridge_ids", "endpoint_ids");
1240  if (!json) {
1241  return NULL;
1242  }
1243  channels = ast_json_object_get(json, "channel_ids");
1244  bridges = ast_json_object_get(json, "bridge_ids");
1245  endpoints = ast_json_object_get(json, "endpoint_ids");
1246 
1247  i = ao2_iterator_init(app->forwards, 0);
1248  while ((forwards = ao2_iterator_next(&i))) {
1249  struct ast_json *array = NULL;
1250  int append_res;
1251 
1252  switch (forwards->forward_type) {
1253  case FORWARD_CHANNEL:
1254  array = channels;
1255  break;
1256  case FORWARD_BRIDGE:
1257  array = bridges;
1258  break;
1259  case FORWARD_ENDPOINT:
1260  array = endpoints;
1261  break;
1262  }
1263 
1264  /* If forward_type value is unexpected this will safely return an error. */
1265  append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1266  ao2_ref(forwards, -1);
1267 
1268  if (append_res != 0) {
1269  ast_log(LOG_ERROR, "Error building response\n");
1271  ast_json_unref(json);
1272 
1273  return NULL;
1274  }
1275  }
1277 
1278  return json;
1279 }
1280 
1282 {
1283  struct app_forwards *forwards;
1284 
1285  if (!app) {
1286  return -1;
1287  }
1288 
1289  ao2_lock(app->forwards);
1290  /* If subscribed to all, don't subscribe again */
1291  forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1292  if (forwards) {
1293  ao2_unlock(app->forwards);
1294  ao2_ref(forwards, -1);
1295 
1296  return 0;
1297  }
1298 
1299  forwards = ao2_find(app->forwards,
1300  chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1302  if (!forwards) {
1303  int res;
1304 
1305  /* Forwards not found, create one */
1306  forwards = forwards_create_channel(app, chan);
1307  if (!forwards) {
1308  ao2_unlock(app->forwards);
1309 
1310  return -1;
1311  }
1312 
1313  res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1314  if (!res) {
1315  ao2_unlock(app->forwards);
1316  ao2_ref(forwards, -1);
1317 
1318  return -1;
1319  }
1320  }
1321 
1322  ++forwards->interested;
1323  ast_debug(3, "Channel '%s' is %d interested in %s\n",
1324  chan ? ast_channel_uniqueid(chan) : "ALL",
1325  forwards->interested,
1326  app->name);
1327 
1328  ao2_unlock(app->forwards);
1329  ao2_ref(forwards, -1);
1330 
1331  return 0;
1332 }
1333 
1334 static int subscribe_channel(struct stasis_app *app, void *obj)
1335 {
1336  return app_subscribe_channel(app, obj);
1337 }
1338 
1339 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
1340 {
1341  struct app_forwards *forwards;
1342 
1343  if (!id) {
1344  if (!strcmp(kind, "bridge")) {
1345  id = BRIDGE_ALL;
1346  } else if (!strcmp(kind, "channel")) {
1347  id = CHANNEL_ALL;
1348  } else if (!strcmp(kind, "endpoint")) {
1349  id = ENDPOINT_ALL;
1350  } else {
1351  ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
1352  return -1;
1353  }
1354  }
1355 
1356  ao2_lock(app->forwards);
1357  forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1358  if (!forwards) {
1359  ao2_unlock(app->forwards);
1360  ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
1361  return -1;
1362  }
1363  forwards->interested--;
1364 
1365  ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
1366  if (forwards->interested == 0 || terminate) {
1367  /* No one is interested any more; unsubscribe */
1368  ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
1369  forwards_unsubscribe(forwards);
1370  ao2_find(app->forwards, forwards,
1372  OBJ_NODATA);
1373 
1374  if (!strcmp(kind, "endpoint")) {
1376  }
1377  }
1378  ao2_unlock(app->forwards);
1379  ao2_ref(forwards, -1);
1380 
1381  return 0;
1382 }
1383 
1385 {
1386  if (!app) {
1387  return -1;
1388  }
1389 
1390  return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1391 }
1392 
1393 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
1394 {
1395  if (!app) {
1396  return -1;
1397  }
1398 
1399  return unsubscribe(app, "channel", channel_id, 0);
1400 }
1401 
1402 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
1403 {
1404  struct app_forwards *forwards;
1405 
1406  if (ast_strlen_zero(channel_id)) {
1407  channel_id = CHANNEL_ALL;
1408  }
1409  forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1410  ao2_cleanup(forwards);
1411 
1412  return forwards != NULL;
1413 }
1414 
1415 static void *channel_find(const struct stasis_app *app, const char *id)
1416 {
1417  return ast_channel_get_by_name(id);
1418 }
1419 
1421  .scheme = "channel:",
1422  .find = channel_find,
1423  .subscribe = subscribe_channel,
1424  .unsubscribe = app_unsubscribe_channel_id,
1425  .is_subscribed = app_is_subscribed_channel_id
1426 };
1427 
1428 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1429 {
1430  struct app_forwards *forwards;
1431 
1432  if (!app) {
1433  return -1;
1434  }
1435 
1436  ao2_lock(app->forwards);
1437  /* If subscribed to all, don't subscribe again */
1438  forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1439  if (forwards) {
1440  ao2_unlock(app->forwards);
1441  ao2_ref(forwards, -1);
1442 
1443  return 0;
1444  }
1445 
1446  forwards = ao2_find(app->forwards,
1447  bridge ? bridge->uniqueid : BRIDGE_ALL,
1449  if (!forwards) {
1450  int res;
1451 
1452  /* Forwards not found, create one */
1453  forwards = forwards_create_bridge(app, bridge);
1454  if (!forwards) {
1455  ao2_unlock(app->forwards);
1456 
1457  return -1;
1458  }
1459 
1460  res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1461  if (!res) {
1462  ao2_unlock(app->forwards);
1463  ao2_ref(forwards, -1);
1464 
1465  return -1;
1466  }
1467  }
1468 
1469  ++forwards->interested;
1470  ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1471  bridge ? bridge->uniqueid : "ALL",
1472  forwards->interested,
1473  app->name);
1474 
1475  ao2_unlock(app->forwards);
1476  ao2_ref(forwards, -1);
1477 
1478  return 0;
1479 }
1480 
1481 static int subscribe_bridge(struct stasis_app *app, void *obj)
1482 {
1483  return app_subscribe_bridge(app, obj);
1484 }
1485 
1486 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
1487 {
1488  if (!app) {
1489  return -1;
1490  }
1491 
1492  return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1493 }
1494 
1495 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
1496 {
1497  if (!app) {
1498  return -1;
1499  }
1500 
1501  return unsubscribe(app, "bridge", bridge_id, 0);
1502 }
1503 
1504 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
1505 {
1506  struct app_forwards *forwards;
1507 
1508  if (ast_strlen_zero(bridge_id)) {
1509  bridge_id = BRIDGE_ALL;
1510  }
1511 
1512  forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1513  ao2_cleanup(forwards);
1514 
1515  return forwards != NULL;
1516 }
1517 
1518 static void *bridge_find(const struct stasis_app *app, const char *id)
1519 {
1520  return stasis_app_bridge_find_by_id(id);
1521 }
1522 
1524  .scheme = "bridge:",
1525  .find = bridge_find,
1526  .subscribe = subscribe_bridge,
1527  .unsubscribe = app_unsubscribe_bridge_id,
1528  .is_subscribed = app_is_subscribed_bridge_id
1529 };
1530 
1531 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
1532 {
1533  struct app_forwards *forwards;
1534 
1535  if (!app) {
1536  return -1;
1537  }
1538 
1539  ao2_lock(app->forwards);
1540  /* If subscribed to all, don't subscribe again */
1542  if (forwards) {
1543  ao2_unlock(app->forwards);
1544  ao2_ref(forwards, -1);
1545 
1546  return 0;
1547  }
1548 
1549  forwards = ao2_find(app->forwards,
1550  endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1552  if (!forwards) {
1553  int res;
1554 
1555  /* Forwards not found, create one */
1556  forwards = forwards_create_endpoint(app, endpoint);
1557  if (!forwards) {
1558  ao2_unlock(app->forwards);
1559 
1560  return -1;
1561  }
1562 
1563  res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1564  if (!res) {
1565  ao2_unlock(app->forwards);
1566  ao2_ref(forwards, -1);
1567 
1568  return -1;
1569  }
1570 
1571  /* Subscribe for messages */
1573  }
1574 
1575  ++forwards->interested;
1576  ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1577  endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1578  forwards->interested,
1579  app->name);
1580 
1581  ao2_unlock(app->forwards);
1582  ao2_ref(forwards, -1);
1583 
1584  return 0;
1585 }
1586 
1587 static int subscribe_endpoint(struct stasis_app *app, void *obj)
1588 {
1589  return app_subscribe_endpoint(app, obj);
1590 }
1591 
1592 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1593 {
1594  if (!app) {
1595  return -1;
1596  }
1597 
1598  return unsubscribe(app, "endpoint", endpoint_id, 0);
1599 }
1600 
1601 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
1602 {
1603  struct app_forwards *forwards;
1604 
1605  if (ast_strlen_zero(endpoint_id)) {
1606  endpoint_id = ENDPOINT_ALL;
1607  }
1608  forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1609  ao2_cleanup(forwards);
1610 
1611  return forwards != NULL;
1612 }
1613 
1614 static void *endpoint_find(const struct stasis_app *app, const char *id)
1615 {
1616  return ast_endpoint_find_by_id(id);
1617 }
1618 
1620  .scheme = "endpoint:",
1621  .find = endpoint_find,
1622  .subscribe = subscribe_endpoint,
1623  .unsubscribe = app_unsubscribe_endpoint_id,
1624  .is_subscribed = app_is_subscribed_endpoint_id
1625 };
1626 
1628 {
1629  stasis_app_register_event_source(&channel_event_source);
1630  stasis_app_register_event_source(&bridge_event_source);
1631  stasis_app_register_event_source(&endpoint_event_source);
1632 }
1633 
1635 {
1636  stasis_app_unregister_event_source(&endpoint_event_source);
1637  stasis_app_unregister_event_source(&bridge_event_source);
1638  stasis_app_unregister_event_source(&channel_event_source);
1639 }
1640 
1642 {
1643  if (!app || !json) {
1644  return json;
1645  }
1646 
1647  ast_json_object_set(json, "events_allowed", app->events_allowed ?
1649  ast_json_object_set(json, "events_disallowed", app->events_disallowed ?
1651 
1652  return json;
1653 }
1654 
1655 static int app_event_filter_set(struct stasis_app *app, struct ast_json **member,
1656  struct ast_json *filter, const char *filter_type)
1657 {
1658  if (filter && ast_json_typeof(filter) == AST_JSON_OBJECT) {
1659  if (!ast_json_object_size(filter)) {
1660  /* If no filters are specified then reset this filter type */
1661  filter = NULL;
1662  } else {
1663  /* Otherwise try to get the filter array for this type */
1664  filter = ast_json_object_get(filter, filter_type);
1665  if (!filter) {
1666  /* A filter type exists, but not this one, so don't update */
1667  return 0;
1668  }
1669  }
1670  }
1671 
1672  /* At this point the filter object should be an array */
1673  if (filter && ast_json_typeof(filter) != AST_JSON_ARRAY) {
1674  ast_log(LOG_ERROR, "Invalid json type event filter - app: %s, filter: %s\n",
1675  app->name, filter_type);
1676  return -1;
1677  }
1678 
1679  if (filter) {
1680  /* Confirm that at least the type names are specified */
1681  struct ast_json *obj;
1682  int i;
1683 
1684  for (i = 0; i < ast_json_array_size(filter) &&
1685  (obj = ast_json_array_get(filter, i)); ++i) {
1686 
1687  if (ast_strlen_zero(ast_json_object_string_get(obj, "type"))) {
1688  ast_log(LOG_ERROR, "Filter event must have a type - app: %s, "
1689  "filter: %s\n", app->name, filter_type);
1690  return -1;
1691  }
1692  }
1693  }
1694 
1695  ao2_lock(app);
1696  ast_json_unref(*member);
1697  *member = filter ? ast_json_ref(filter) : NULL;
1698  ao2_unlock(app);
1699 
1700  return 0;
1701 }
1702 
1704 {
1705  return app_event_filter_set(app, &app->events_allowed, filter, "allowed");
1706 }
1707 
1709 {
1710  return app_event_filter_set(app, &app->events_disallowed, filter, "disallowed");
1711 }
1712 
1714 {
1715  return app_events_disallowed_set(app, filter) || app_events_allowed_set(app, filter);
1716 }
1717 
1718 static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
1719 {
1720  struct ast_json *obj;
1721  int i;
1722 
1723  if (!array || !ast_json_array_size(array)) {
1724  return empty;
1725  }
1726 
1727  for (i = 0; i < ast_json_array_size(array) &&
1728  (obj = ast_json_array_get(array, i)); ++i) {
1729 
1731  ast_json_object_string_get(event, "type"))) {
1732  return 1;
1733  }
1734  }
1735 
1736  return 0;
1737 }
1738 
1740 {
1741  struct stasis_app *app = stasis_app_get_by_name(app_name);
1742  int res;
1743 
1744  if (!app) {
1745  return 0;
1746  }
1747 
1748  ao2_lock(app);
1749  res = !app_event_filter_matched(app->events_disallowed, event, 0) &&
1750  app_event_filter_matched(app->events_allowed, event, 1);
1751  ao2_unlock(app);
1752  ao2_ref(app, -1);
1753 
1754  return res;
1755 }
struct ast_json * events_allowed
struct stasis_message_type * ast_bridge_snapshot_type(void)
Message type for ast_bridge_snapshot.
struct stasis_message_type * ast_blind_transfer_type(void)
Message type for ast_blind_transfer_message.
struct stasis_app * stasis_app_get_by_name(const char *name)
Retrieve a handle to a Stasis application by its name.
Definition: res_stasis.c:1694
static const char type[]
Definition: chan_ooh323.c:109
static struct app_forwards * forwards_create_bridge(struct stasis_app *app, struct ast_bridge *bridge)
const ast_string_field data
struct ao2_container * channels
Definition: bridge.h:339
Main Channel structure associated with a channel.
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
void stasis_app_set_debug_by_name(const char *app_name, int debug)
Enable/disable request/response and event logging on an application.
static void sub_bridge_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct ast_channel_snapshot_base * base
union ast_attended_transfer_message::@324 dest
Asterisk main include file. File version handling, generic pbx functions.
static struct ast_json * simple_channel_event(const char *type, struct ast_channel_snapshot *snapshot, const struct timeval *tv)
const ast_string_field uniqueid
Definition: bridge.h:409
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:971
#define ARRAY_LEN(a)
Definition: isdn_lib.c:42
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:591
static int forwards_filter_by_type(void *obj, void *arg, int flags)
static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
CallerID (and other GR30) management and generation Includes code and algorithms from the Zapata libr...
Message representing attended transfer.
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
Definition: astobj2.h:409
stasis_app_subscription_model
size_t ast_json_object_size(struct ast_json *object)
Get size of JSON object.
Definition: json.c:393
#define CHANNEL_ALL
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: main/utils.c:2587
#define ast_channel_unref(c)
Decrease channel reference count.
Definition: channel.h:2981
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
const char * ast_describe_caller_presentation(int data)
Convert caller ID pres value to explanatory string.
Definition: callerid.c:1164
#define ast_test_flag(p, flag)
Definition: utils.h:63
static void sub_channel_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
static void update(int code_size, int y, int wi, int fi, int dq, int sr, int dqsez, struct g726_state *state_ptr)
Definition: codec_g726.c:367
Message published during a blind transfer.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
static struct ast_json * channel_created_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
#define OBJ_KEY
Definition: astobj2.h:1155
static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
const char * scheme
The scheme to match against on [un]subscribes.
Definition: stasis_app.h:176
#define OBJ_POINTER
Definition: astobj2.h:1154
static struct ast_json * channel_state_change_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
static void app_dtor(void *obj)
#define LOG_WARNING
Definition: logger.h:274
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
void ao2_iterator_cleanup(struct ao2_iterator *iter)
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
struct ast_endpoint_snapshot * ast_endpoint_latest_snapshot(const char *tech, const char *resource)
Retrieve the most recent snapshot for the endpoint with the given name.
Structure that contains a snapshot of information about a bridge.
Definition: bridge.h:322
const ast_string_field video_source_id
Definition: bridge.h:336
#define AST_LOG_WARNING
Definition: logger.h:279
void app_deactivate(struct stasis_app *app)
Deactivates an application.
Structure representing a snapshot of channel state.
static int subscribe_channel(struct stasis_app *app, void *obj)
static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void forwards_dtor(void *obj)
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
Cancel the subscription an app has for a channel.
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
Definition: astman.c:222
static int app_event_filter_set(struct stasis_app *app, struct ast_json **member, struct ast_json *filter, const char *filter_type)
stasis_app_cb handler
const ast_string_field uniqueid
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
int stasis_app_event_filter_set(struct stasis_app *app, struct ast_json *filter)
Set the application&#39;s event type filter.
void app_shutdown(struct stasis_app *app)
Tears down an application.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:695
#define ao2_link_flags(container, obj, flags)
Definition: astobj2.h:1572
static void endpoint_state_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
#define ao2_unlock(a)
Definition: astobj2.h:730
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
Cancel the subscription an app has for a bridge.
const ast_string_field id
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
struct ast_endpoint * ast_endpoint_find_by_id(const char *id)
Finds the endpoint with the given tech[/resource] id.
char bridge[AST_UUID_STR_LEN]
int ast_channel_snapshot_connected_line_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the connected line info of two snapshots.
static void sub_default_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
#define NULL
Definition: resample.c:96
struct stasis_forward * topic_cached_forward
void stasis_app_set_debug(struct stasis_app *app, int debug)
Enable/disable request/response and event logging on an application.
void ast_cli(int fd, const char *fmt,...)
Definition: clicompat.c:6
int ast_channel_snapshot_cep_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the context, exten and priority of two snapshots.
Structure representing a change of snapshot of channel state.
void ast_free_ptr(void *ptr)
free() wrapper
Definition: astmm.c:1771
struct ast_channel_snapshot_dialplan * dialplan
#define ast_verb(level,...)
Definition: logger.h:463
struct ast_bridge_channel_snapshot_pair to_transferee
int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
Subscribe an application to an endpoint for messages.
Definition: messaging.c:493
static struct ast_json * channel_state(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
Handle channel state changes.
int app_is_active(struct stasis_app *app)
Checks whether an app is active.
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
Test if an app is subscribed to a channel.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
Add a bridge subscription to an existing channel subscription.
static const char context_name[]
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:404
#define OBJ_PARTIAL_KEY
Definition: astobj2.h:1156
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
static channel_snapshot_monitor channel_monitors[]
static int app_events_allowed_set(struct stasis_app *app, struct ast_json *filter)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
Test if an app is subscribed to a endpoint.
int ast_context_destroy_by_name(const char *context, const char *registrar)
Destroy a context by name.
Definition: pbx.c:8244
#define ast_strlen_zero(foo)
Definition: strings.h:52
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
struct stasis_message_router * endpoint_router
#define ao2_bump(obj)
Definition: astobj2.h:491
enum forward_type forward_type
void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
Dump properties of a stasis_app to the CLI.
struct ast_bridge_channel_snapshot_pair to_transfer_target
static int subscribe_endpoint(struct stasis_app *app, void *obj)
struct ao2_container * forwards
struct stasis_topic * ast_channel_topic_all(void)
A topic which publishes the events for all channels.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ast_log
Definition: astobj2.c:42
struct stasis_message_type * ast_bridge_merge_message_type(void)
Message type for ast_bridge_merge_message.
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
void stasis_app_unregister_event_sources(void)
Unregister core event sources.
struct ast_context * ast_context_find(const char *name)
Find a context.
Definition: extconf.c:4174
struct ast_json *(* channel_snapshot_monitor)(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
Typedef for callbacks that get called on channel snapshot updates.
struct stasis_app_event_source bridge_event_source
struct ast_channel_snapshot * links[2]
static struct app_forwards * forwards_create_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
const ast_string_field appl
static struct ast_json * channel_callerid(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
struct stasis_app * app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
Create a res_stasis application.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
const int fd
Definition: cli.h:159
static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
Event source information and callbacks.
Definition: stasis_app.h:174
static struct ast_json * simple_bridge_event(const char *type, struct ast_bridge_snapshot *snapshot, const struct timeval *tv)
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
#define ast_json_object_string_get(object, key)
Get a string field from a JSON object.
Definition: json.h:573
static struct ao2_container * endpoints
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
#define ao2_lock(a)
Definition: astobj2.h:718
#define ast_strdupa(s)
duplicate a string in memory from the stack
Definition: astmm.h:300
struct ast_channel_snapshot_hangup * hangup
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:273
struct ast_bridge_snapshot * bridge
struct stasis_topic * ast_app_get_topic(struct stasis_app *app)
Returns the stasis topic for an app.
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
Cancel the subscription an app has for a channel.
static struct channel_usage channels
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:618
struct stasis_app_event_source endpoint_event_source
const char * stasis_app_name(const struct stasis_app *app)
Retrieve an application&#39;s name.
struct ast_bridge_snapshot * old_snapshot
Internal API for the Stasis application controller.
int app_is_finished(struct stasis_app *app)
Checks whether a deactivated app has no channels.
Cache update message.
Definition: stasis.h:967
struct ast_json * ast_json_array_create(void)
Create a empty JSON array.
Definition: json.c:352
#define stasis_message_router_create(topic)
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:649
A snapshot of an endpoint&#39;s state.
int ast_json_array_append(struct ast_json *array, struct ast_json *value)
Append to an array.
Definition: json.c:368
static void * channel_find(const struct stasis_app *app, const char *id)
struct stasis_topic * ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
const char * ast_channel_uniqueid(const struct ast_channel *chan)
struct ast_channel_snapshot_caller * caller
enum ast_attended_transfer_dest_type dest_type
void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
Unregister an application event source.
Definition: res_stasis.c:1823
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
static void * endpoint_find(const struct stasis_app *app, const char *id)
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:973
static struct ast_json * channel_dialplan(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static int subscribed
Definition: manager.c:1476
Structure that contains information about a bridge.
Definition: bridge.h:357
static struct ast_json * channel_connected_line(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static struct ast_json * simple_endpoint_event(const char *type, struct ast_endpoint_snapshot *snapshot, const struct timeval *tv)
void app_send(struct stasis_app *app, struct ast_json *message)
Send a message to the given application.
struct stasis_message_sanitizer * stasis_app_get_sanitizer(void)
Get the Stasis message sanitizer for app_stasis applications.
Definition: res_stasis.c:2264
struct ast_channel_snapshot * transferer
struct ast_channel_snapshot * channel_snapshot
#define LOG_ERROR
Definition: logger.h:285
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
Subscribes an application to a endpoint.
static struct ast_json * channel_destroyed_event(struct ast_channel_snapshot *snapshot, const struct timeval *tv)
int ast_channel_snapshot_caller_id_equal(const struct ast_channel_snapshot *old_snapshot, const struct ast_channel_snapshot *new_snapshot)
Compares the callerid info of two snapshots.
struct stasis_message_router * router
static int array(struct ast_channel *chan, const char *cmd, char *var, const char *value)
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
int global_debug
static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
struct stasis_message_type * ast_endpoint_state_type(void)
Message type for endpoint state changes.
struct ast_bridge * stasis_app_bridge_find_by_id(const char *bridge_id)
Returns the bridge with the given id.
Definition: res_stasis.c:774
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:969
int ast_strings_equal(const char *str1, const char *str2)
Compare strings for equality checking for NULL.
Definition: strings.c:239
static void forwards_unsubscribe(struct app_forwards *forwards)
const char * app_name(struct ast_app *app)
Definition: pbx_app.c:463
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
struct ast_bridge_channel_snapshot_pair threeway
struct stasis_message_type * ast_attended_transfer_type(void)
Message type for ast_attended_transfer_message.
struct ast_json * events_disallowed
static struct ao2_container * bridges
Definition: bridge.c:123
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1176
struct ast_bridge_snapshot * bridge_snapshot
struct stasis_app_event_source channel_event_source
struct stasis_topic * topic
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
struct stasis_forward * topic_forward
const char * ast_cause2str(int state) attribute_pure
Gives the string form of a given cause code.
Definition: channel.c:612
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
#define ast_free(a)
Definition: astmm.h:182
struct ao2_container * stasis_app_get_all(void)
Gets the names of all registered Stasis applications.
Definition: res_stasis.c:1708
static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
Helper function for determining if the application is subscribed to a given entity.
static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
Definition: stasis.c:1548
enum ast_json_type ast_json_typeof(const struct ast_json *value)
Get the type of value.
Definition: json.c:78
struct ast_channel_snapshot * new_snapshot
enum ast_channel_state state
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
const ast_string_field uniqueid
Definition: bridge.h:336
struct ast_bridge_snapshot * new_snapshot
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
struct ast_channel_snapshot * old_snapshot
int stasis_app_get_debug(struct stasis_app *app)
Get debug status of an application.
void(* stasis_app_cb)(void *data, const char *app_name, struct ast_json *message)
Callback for Stasis application handler.
Definition: stasis_app.h:67
struct ast_bridge_snapshot * from
struct stasis_topic * ast_bridge_topic_all(void)
A topic which publishes the events for all bridges.
struct ast_json * stasis_app_event_filter_to_json(struct stasis_app *app, struct ast_json *json)
Convert and add the app&#39;s event type filter(s) to the given json object.
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
Cancel the subscription an app has for a endpoint.
static void * bridge_find(const struct stasis_app *app, const char *id)
Message representing the merge of two bridges.
struct ast_json * app_to_json(const struct stasis_app *app)
Create a JSON representation of a stasis_app.
static int subscribe_bridge(struct stasis_app *app, void *obj)
#define ao2_replace(dst, src)
Definition: astobj2.h:517
void stasis_app_set_global_debug(int debug)
Enable/disable request/response and event logging on all applications.
int ast_add_extension(const char *context, int replace, const char *extension, int priority, const char *label, const char *callerid, const char *application, void *data, void(*datad)(void *), const char *registrar)
Add and extension to an extension context.
Definition: pbx.c:6970
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Standard Command Line Interface.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
Cancel the bridge subscription for an application.
static struct app_forwards * forwards_create(struct stasis_app *app, const char *id)
Internal API for the Stasis application controller.
#define BRIDGE_ALL
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct ast_flags flags
struct ast_channel_snapshot * ast_multi_channel_blob_get_channel(struct ast_multi_channel_blob *obj, const char *role)
Retrieve a channel snapshot associated with a specific role from a ast_multi_channel_blob.
A multi channel blob data structure for multi_channel_blob stasis messages.
void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
Update the handler and data for a res_stasis application.
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
void stasis_app_register_event_sources(void)
Register core event sources.
size_t ast_json_array_size(const struct ast_json *array)
Get the size of a JSON array.
Definition: json.c:356
void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
Subscribe for messages from a particular endpoint.
Definition: messaging.c:423
Stasis out-of-call text message support.
struct stasis_forward * sub
Definition: res_corosync.c:240
struct stasis_message_type * ast_endpoint_contact_state_type(void)
Message type for endpoint contact state changes.
static void sub_endpoint_update_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Abstract JSON element (object, array, string, int, ...).
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
Subscribes an application to a channel.
Forwarding information.
Definition: stasis.c:1531
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Definition: astobj2.h:1358
struct stasis_message_type * ast_channel_dial_type(void)
Message type for when a channel dials another channel.
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
Test if an app is subscribed to a bridge.
forward_type
Stasis Application API. See Stasis Application API for detailed documentation.
Generic container type.
struct stasis_topic * ast_bridge_topic(struct ast_bridge *bridge)
A topic which publishes the events for a particular bridge.
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:709
struct ast_context * ast_context_find_or_create(struct ast_context **extcontexts, struct ast_hashtab *exttable, const char *name, const char *registrar)
Register a new context or find an existing one.
Definition: pbx.c:6198
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct ast_channel * ast_channel_get_by_name(const char *name)
Find a channel by name.
Definition: channel.c:1454
static const char app[]
Definition: app_mysql.c:62
enum stasis_app_subscription_model subscription_model
struct ast_json * ast_json_array_get(const struct ast_json *array, size_t index)
Get an element from an array.
Definition: json.c:360
#define AST_JSON_UTF8_VALIDATE(str)
Check str for UTF-8 and replace with an empty string if fails the check.
Definition: json.h:224
Endpoint abstractions.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
Callback function for checking if channels in a bridge are subscribed to.
static int app_event_filter_matched(struct ast_json *array, struct ast_json *event, int empty)
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
Reject duplicate objects in container.
Definition: astobj2.h:1205
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
Register an application event source.
Definition: res_stasis.c:1816
struct stasis_message_router * bridge_router
static struct app_forwards * forwards_create_channel(struct stasis_app *app, struct ast_channel *chan)
static void bridge_merge_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message)
struct ast_bridge_snapshot * to
static int app_events_disallowed_set(struct stasis_app *app, struct ast_json *filter)
#define ENDPOINT_ALL
static struct test_val a