Asterisk - The Open Source Telephony Project  18.5.0
Data Structures | Macros | Enumerations | Functions | Variables
resource_events.c File Reference

/api-docs/events.{format} implementation- WebSocket resource More...

#include "asterisk.h"
#include "resource_events.h"
#include "asterisk/astobj2.h"
#include "asterisk/http_websocket.h"
#include "asterisk/stasis_app.h"
#include "asterisk/vector.h"
Include dependency graph for resource_events.c:

Go to the source code of this file.

Data Structures

struct  event_session
 A wrapper for the /ref ast_ari_websocket_session. More...
 

Macros

#define APPS_NUM_BUCKETS   7
 
#define EVENT_SESSION_NUM_BUCKETS   23
 
#define MESSAGES_INIT_SIZE   23
 

Enumerations

enum  event_session_error_type { ERROR_TYPE_STASIS_REGISTRATION = 1, ERROR_TYPE_OOM = 2, ERROR_TYPE_MISSING_APP_PARAM = 3, ERROR_TYPE_INVALID_APP_PARAM = 4 }
 event_session error types. More...
 

Functions

void ast_ari_events_user_event (struct ast_variable *headers, struct ast_ari_events_user_event_args *args, struct ast_ari_response *response)
 Generate a user event. More...
 
int ast_ari_websocket_events_event_websocket_attempted (struct ast_tcptls_session_instance *ser, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args, const char *session_id)
 WebSocket connection for events. More...
 
void ast_ari_websocket_events_event_websocket_dtor (void)
 WebSocket connection for events. More...
 
void ast_ari_websocket_events_event_websocket_established (struct ast_ari_websocket_session *ws_session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args)
 WebSocket connection for events. More...
 
int ast_ari_websocket_events_event_websocket_init (void)
 WebSocket connection for events. More...
 
static int event_session_alloc (struct ast_tcptls_session_instance *ser, struct ast_ari_events_event_websocket_args *args, const char *session_id)
 Creates an event_session object and registers its apps with Stasis. More...
 
static int event_session_allocation_error_handler (struct event_session *session, enum event_session_error_type error, struct ast_tcptls_session_instance *ser)
 Handles event_session error processing. More...
 
static void event_session_cleanup (struct event_session *session)
 Processes cleanup actions for a event_session object. More...
 
static int event_session_compare (void *obj, void *arg, int flags)
 AO2 comparison function for event_session objects. More...
 
static void event_session_dtor (void *obj)
 Event session object destructor (event_session). More...
 
static int event_session_hash (const void *obj, const int flags)
 AO2 hash function for event_session objects. More...
 
static void event_session_shutdown (struct event_session *session)
 Explicitly shutdown a session. More...
 
static int event_session_shutdown_cb (void *session, void *arg, int flags)
 
static void event_session_update_websocket (struct event_session *session, struct ast_ari_websocket_session *ws_session)
 Updates the websocket session for an event_session. More...
 
static void stasis_app_message_handler (void *data, const char *app_name, struct ast_json *message)
 Callback handler for Stasis application messages. More...
 

Variables

static struct ao2_containerevent_session_registry
 Local registry for created event_session objects. More...
 

Detailed Description

/api-docs/events.{format} implementation- WebSocket resource

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file resource_events.c.

Macro Definition Documentation

◆ APPS_NUM_BUCKETS

#define APPS_NUM_BUCKETS   7

Number of buckets for a websocket apps container. Remember to keep it a prime number!

Definition at line 42 of file resource_events.c.

Referenced by event_session_alloc().

◆ EVENT_SESSION_NUM_BUCKETS

#define EVENT_SESSION_NUM_BUCKETS   23

Number of buckets for the event session registry. Remember to keep it a prime number!

Definition at line 39 of file resource_events.c.

Referenced by ast_ari_websocket_events_event_websocket_init().

◆ MESSAGES_INIT_SIZE

#define MESSAGES_INIT_SIZE   23

Initial size of a message queue.

Definition at line 45 of file resource_events.c.

Referenced by event_session_alloc().

Enumeration Type Documentation

◆ event_session_error_type

event_session error types.

Enumerator
ERROR_TYPE_STASIS_REGISTRATION 

Stasis failed to register the application.

ERROR_TYPE_OOM 

Insufficient memory to create the event session.

ERROR_TYPE_MISSING_APP_PARAM 

HTTP request was missing an [app] parameter.

ERROR_TYPE_INVALID_APP_PARAM 

HTTP request contained an invalid [app] parameter.

Definition at line 58 of file resource_events.c.

58  {
59  ERROR_TYPE_STASIS_REGISTRATION = 1, /*!< Stasis failed to register the application. */
60  ERROR_TYPE_OOM = 2, /*!< Insufficient memory to create the event
61  session. */
62  ERROR_TYPE_MISSING_APP_PARAM = 3, /*!< HTTP request was missing an [app] parameter. */
63  ERROR_TYPE_INVALID_APP_PARAM = 4, /*!< HTTP request contained an invalid [app]
64  parameter. */
65 };

Function Documentation

◆ ast_ari_events_user_event()

void ast_ari_events_user_event ( struct ast_variable headers,
struct ast_ari_events_user_event_args args,
struct ast_ari_response response 
)

Generate a user event.

Parameters
headersHTTP headers
argsSwagger parameters
[out]responseHTTP response

Definition at line 532 of file resource_events.c.

References ast_ari_events_user_event_args::application, ast_ari_events_user_event_parse_body(), ast_ari_response_error(), ast_ari_response_no_content(), ast_json_object_get(), ast_strlen_zero, ast_ari_events_user_event_args::event_name, NULL, ast_ari_events_user_event_args::source, ast_ari_events_user_event_args::source_count, STASIS_APP_USER_APP_NOT_FOUND, stasis_app_user_event(), STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME, STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND, STASIS_APP_USER_INTERNAL_ERROR, STASIS_APP_USER_OK, STASIS_APP_USER_USEREVENT_INVALID, and ast_ari_events_user_event_args::variables.

Referenced by ast_ari_events_user_event_cb().

535 {
536  enum stasis_app_user_event_res res;
537  struct ast_json *json_variables = NULL;
538 
539  if (args->variables) {
541  json_variables = ast_json_object_get(args->variables, "variables");
542  }
543 
544  if (ast_strlen_zero(args->application)) {
545  ast_ari_response_error(response, 400, "Bad Request",
546  "Missing parameter application");
547  return;
548  }
549 
551  args->event_name,
552  args->source, args->source_count,
553  json_variables);
554 
555  switch (res) {
556  case STASIS_APP_USER_OK:
557  ast_ari_response_no_content(response);
558  break;
559 
561  ast_ari_response_error(response, 404, "Not Found",
562  "Application not found");
563  break;
564 
566  ast_ari_response_error(response, 422, "Unprocessable Entity",
567  "Event source was not found");
568  break;
569 
571  ast_ari_response_error(response, 400, "Bad Request",
572  "Invalid event source URI scheme");
573  break;
574 
576  ast_ari_response_error(response, 400, "Bad Request",
577  "Invalid userevnet data");
578  break;
579 
581  default:
582  ast_ari_response_error(response, 500, "Internal Server Error",
583  "Error processing request");
584  }
585 }
#define NULL
Definition: resample.c:96
#define ast_strlen_zero(foo)
Definition: strings.h:52
int ast_ari_events_user_event_parse_body(struct ast_json *body, struct ast_ari_events_user_event_args *args)
Body parsing function for /events/user/{eventName}.
void ast_ari_response_error(struct ast_ari_response *response, int response_code, const char *response_text, const char *message_fmt,...)
Fill in an error ast_ari_response.
Definition: res_ari.c:259
void ast_ari_response_no_content(struct ast_ari_response *response)
Fill in a No Content (204) ast_ari_response.
Definition: res_ari.c:284
stasis_app_user_event_res
Return code for stasis_app_user_event.
Definition: stasis_app.h:255
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
Abstract JSON element (object, array, string, int, ...).
enum stasis_app_user_event_res stasis_app_user_event(const char *app_name, const char *event_name, const char **source_uris, int sources_count, struct ast_json *json_variables)
Generate a Userevent for stasis app (echo to AMI)
Definition: res_stasis.c:2095

◆ ast_ari_websocket_events_event_websocket_attempted()

int ast_ari_websocket_events_event_websocket_attempted ( struct ast_tcptls_session_instance ser,
struct ast_variable headers,
struct ast_ari_events_event_websocket_args args,
const char *  session_id 
)

WebSocket connection for events.

Parameters
serHTTP TCP/TLS Server Session
headersHTTP headers
argsSwagger parameters
session_idThe id of the current session.
Return values
0success
non-zeroerror

Definition at line 488 of file resource_events.c.

References ast_debug, and event_session_alloc().

Referenced by ast_ari_events_event_websocket_ws_attempted_cb().

491 {
492  ast_debug(3, "/events WebSocket attempted\n");
493 
494  /* Create the event session */
495  return event_session_alloc(ser, args, session_id);
496 }
static int event_session_alloc(struct ast_tcptls_session_instance *ser, struct ast_ari_events_event_websocket_args *args, const char *session_id)
Creates an event_session object and registers its apps with Stasis.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452

◆ ast_ari_websocket_events_event_websocket_dtor()

void ast_ari_websocket_events_event_websocket_dtor ( void  )

WebSocket connection for events.

Returns
Nothing

Definition at line 465 of file resource_events.c.

References ao2_callback, ao2_cleanup, event_session_shutdown_cb(), NULL, OBJ_MULTIPLE, and OBJ_NODATA.

Referenced by load_module(), and unload_module().

466 {
468 
471 }
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
#define NULL
Definition: resample.c:96
static int event_session_shutdown_cb(void *session, void *arg, int flags)
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static struct ao2_container * event_session_registry
Local registry for created event_session objects.

◆ ast_ari_websocket_events_event_websocket_established()

void ast_ari_websocket_events_event_websocket_established ( struct ast_ari_websocket_session session,
struct ast_variable headers,
struct ast_ari_events_event_websocket_args args 
)

WebSocket connection for events.

Parameters
sessionARI WebSocket.
headersHTTP headers.
argsSwagger parameters.
session_idThe id of the current session.

Definition at line 498 of file resource_events.c.

References ao2_find, ao2_ref, ao2_unlink, ast_ari_websocket_session_id(), ast_ari_websocket_session_read(), ast_assert, ast_debug, ast_json_unref(), ast_log, event_session_cleanup(), event_session_update_websocket(), LOG_WARNING, NULL, OBJ_SEARCH_KEY, and session.

Referenced by ast_ari_events_event_websocket_ws_established_cb().

501 {
502  struct event_session *session;
503 
504  struct ast_json *msg;
505  const char *session_id;
506 
507  ast_debug(3, "/events WebSocket established\n");
508 
509  ast_assert(ws_session != NULL);
510 
511  session_id = ast_ari_websocket_session_id(ws_session);
512 
513  /* Find the event_session and update its websocket */
514  session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY);
515  if (session) {
517  event_session_update_websocket(session, ws_session);
518  } else {
520  "Failed to locate an event session for the provided websocket session\n");
521  }
522 
523  /* We don't process any input, but we'll consume it waiting for EOF */
524  while ((msg = ast_ari_websocket_session_read(ws_session))) {
525  ast_json_unref(msg);
526  }
527 
528  event_session_cleanup(session);
529  ao2_ref(session, -1);
530 }
const char * ast_ari_websocket_session_id(const struct ast_ari_websocket_session *session)
Get the Session ID for an ARI WebSocket.
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
#define LOG_WARNING
Definition: logger.h:274
#define ast_assert(a)
Definition: utils.h:695
struct ast_json * ast_ari_websocket_session_read(struct ast_ari_websocket_session *session)
Read a message from an ARI WebSocket.
#define NULL
Definition: resample.c:96
static void event_session_cleanup(struct event_session *session)
Processes cleanup actions for a event_session object.
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ast_log
Definition: astobj2.c:42
static struct ast_mansession session
static void event_session_update_websocket(struct event_session *session, struct ast_ari_websocket_session *ws_session)
Updates the websocket session for an event_session.
A wrapper for the /ref ast_ari_websocket_session.
#define ao2_ref(o, delta)
Definition: astobj2.h:464
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
Abstract JSON element (object, array, string, int, ...).
static struct ao2_container * event_session_registry
Local registry for created event_session objects.

◆ ast_ari_websocket_events_event_websocket_init()

int ast_ari_websocket_events_event_websocket_init ( void  )

WebSocket connection for events.

Return values
0success
-1error

Definition at line 473 of file resource_events.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ast_log, event_session_compare(), event_session_hash(), EVENT_SESSION_NUM_BUCKETS, LOG_WARNING, and NULL.

Referenced by load_module().

474 {
475  /* Try to instantiate the registry */
478  if (!event_session_registry) {
479  /* This is bad, bad. */
481  "Failed to allocate the local registry for websocket applications\n");
482  return -1;
483  }
484 
485  return 0;
486 }
#define LOG_WARNING
Definition: logger.h:274
#define NULL
Definition: resample.c:96
#define ast_log
Definition: astobj2.c:42
#define EVENT_SESSION_NUM_BUCKETS
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
static int event_session_hash(const void *obj, const int flags)
AO2 hash function for event_session objects.
static int event_session_compare(void *obj, void *arg, int flags)
AO2 comparison function for event_session objects.
static struct ao2_container * event_session_registry
Local registry for created event_session objects.

◆ event_session_alloc()

static int event_session_alloc ( struct ast_tcptls_session_instance ser,
struct ast_ari_events_event_websocket_args args,
const char *  session_id 
)
static

Creates an event_session object and registers its apps with Stasis.

Definition at line 388 of file resource_events.c.

References ao2_alloc, ao2_cleanup, ao2_link, ast_ari_events_event_websocket_args::app, app, ast_ari_events_event_websocket_args::app_count, APPS_NUM_BUCKETS, ast_log, ast_str_container_add(), ast_str_container_alloc, ast_strlen_zero, AST_VECTOR_INIT, ERROR_TYPE_INVALID_APP_PARAM, ERROR_TYPE_MISSING_APP_PARAM, ERROR_TYPE_OOM, ERROR_TYPE_STASIS_REGISTRATION, event_session_allocation_error_handler(), event_session_dtor(), handler(), LOG_WARNING, MESSAGES_INIT_SIZE, NULL, RAII_VAR, session, stasis_app_message_handler(), stasis_app_register(), stasis_app_register_all(), and ast_ari_events_event_websocket_args::subscribe_all.

Referenced by ast_ari_websocket_events_event_websocket_attempted().

390 {
392  int (* register_handler)(const char *, stasis_app_cb handler, void *data);
393  size_t size, i;
394 
395  /* The request must have at least one [app] parameter */
396  if (args->app_count == 0) {
399  }
400 
401  size = sizeof(*session) + strlen(session_id) + 1;
402 
403  /* Instantiate the event session */
405  if (!session) {
407  }
408 
409  strncpy(session->session_id, session_id, size - sizeof(*session));
410 
411  /* Instantiate the hash table for Stasis apps */
412  session->websocket_apps =
414 
415  if (!session->websocket_apps) {
417  }
418 
419  /* Instantiate the message queue */
420  if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) {
422  }
423 
424  /* Register the apps with Stasis */
425  if (args->subscribe_all) {
426  register_handler = &stasis_app_register_all;
427  } else {
428  register_handler = &stasis_app_register;
429  }
430 
431  for (i = 0; i < args->app_count; ++i) {
432  const char *app = args->app[i];
433 
434  if (ast_strlen_zero(app)) {
437  }
438 
439  if (ast_str_container_add(session->websocket_apps, app)) {
441  }
442 
443  if (register_handler(app, stasis_app_message_handler, session)) {
444  ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
447  }
448  }
449 
450  /* Add the event session to the local registry */
453  }
454 
455  return 0;
456 }
static int event_session_allocation_error_handler(struct event_session *session, enum event_session_error_type error, struct ast_tcptls_session_instance *ser)
Handles event_session error processing.
static void event_session_dtor(void *obj)
Event session object destructor (event_session).
#define LOG_WARNING
Definition: logger.h:274
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1312
#define MESSAGES_INIT_SIZE
#define NULL
Definition: resample.c:96
#define ast_strlen_zero(foo)
Definition: strings.h:52
#define ast_log
Definition: astobj2.c:42
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
static struct ast_mansession session
A wrapper for the /ref ast_ari_websocket_session.
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application.
Definition: res_stasis.c:1777
void(* stasis_app_cb)(void *data, const char *app_name, struct ast_json *message)
Callback for Stasis application handler.
Definition: stasis_app.h:67
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
Definition: test_ari.c:59
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
Register a new Stasis application that receives all Asterisk events.
Definition: res_stasis.c:1782
static struct ao2_container * event_session_registry
Local registry for created event_session objects.
static const char app[]
Definition: app_mysql.c:62
#define APPS_NUM_BUCKETS
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:206
#define ao2_link(container, obj)
Definition: astobj2.h:1549
static void stasis_app_message_handler(void *data, const char *app_name, struct ast_json *message)
Callback handler for Stasis application messages.

◆ event_session_allocation_error_handler()

static int event_session_allocation_error_handler ( struct event_session session,
enum event_session_error_type  error,
struct ast_tcptls_session_instance ser 
)
static

Handles event_session error processing.

Definition at line 339 of file resource_events.c.

References ast_http_error(), ERROR_TYPE_INVALID_APP_PARAM, ERROR_TYPE_MISSING_APP_PARAM, ERROR_TYPE_OOM, ERROR_TYPE_STASIS_REGISTRATION, and event_session_cleanup().

Referenced by event_session_alloc().

342 {
343  /* Notify the client */
344  switch (error) {
346  ast_http_error(ser, 500, "Internal Server Error",
347  "Stasis registration failed");
348  break;
349 
350  case ERROR_TYPE_OOM:
351  ast_http_error(ser, 500, "Internal Server Error",
352  "Allocation failed");
353  break;
354 
356  ast_http_error(ser, 400, "Bad Request",
357  "HTTP request is missing param: [app]");
358  break;
359 
361  ast_http_error(ser, 400, "Bad Request",
362  "Invalid application provided in param [app].");
363  break;
364 
365  default:
366  break;
367  }
368 
369  /* Cleanup the session */
370  event_session_cleanup(session);
371  return -1;
372 }
void ast_http_error(struct ast_tcptls_session_instance *ser, int status, const char *title, const char *text)
Send HTTP error message and close socket.
Definition: http.c:648
static void event_session_cleanup(struct event_session *session)
Processes cleanup actions for a event_session object.
int error(const char *format,...)
Definition: utils/frame.c:999

◆ event_session_cleanup()

static void event_session_cleanup ( struct event_session session)
static

Processes cleanup actions for a event_session object.

Definition at line 297 of file resource_events.c.

References ao2_unlink, and event_session_shutdown().

Referenced by ast_ari_websocket_events_event_websocket_established(), event_session_allocation_error_handler(), and event_session_shutdown_cb().

298 {
299  if (!session) {
300  return;
301  }
302 
303  event_session_shutdown(session);
306  }
307 }
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
static void event_session_shutdown(struct event_session *session)
Explicitly shutdown a session.
static struct ao2_container * event_session_registry
Local registry for created event_session objects.

◆ event_session_compare()

static int event_session_compare ( void *  obj,
void *  arg,
int  flags 
)
static

AO2 comparison function for event_session objects.

Definition at line 153 of file resource_events.c.

References CMP_MATCH, OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and OBJ_SEARCH_PARTIAL_KEY.

Referenced by ast_ari_websocket_events_event_websocket_init().

154 {
155  const struct event_session *object_left = obj;
156  const struct event_session *object_right = arg;
157  const char *right_key = arg;
158  int cmp = 0;
159 
160  switch (flags & OBJ_SEARCH_MASK) {
161  case OBJ_SEARCH_OBJECT:
162  right_key = object_right->session_id;
163  /* Fall through */
164  case OBJ_SEARCH_KEY:
165  cmp = strcmp(object_left->session_id, right_key);
166  break;
168  cmp = strncmp(object_left->session_id, right_key, strlen(right_key));
169  break;
170  default:
171  break;
172  }
173 
174  return cmp ? 0 : CMP_MATCH;
175 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
A wrapper for the /ref ast_ari_websocket_session.
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
Search option field mask.
Definition: astobj2.h:1076

◆ event_session_dtor()

static void event_session_dtor ( void *  obj)
static

Event session object destructor (event_session).

Definition at line 316 of file resource_events.c.

References ast_assert, AST_VECTOR_SIZE, NULL, session, event_session::websocket_apps, and event_session::ws_session.

Referenced by event_session_alloc().

317 {
318 #ifdef AST_DEVMODE /* Avoid unused variable warning */
319  struct event_session *session = obj;
320 #endif
321 
322  /* event_session_shutdown should have been called before now */
323  ast_assert(session->ws_session == NULL);
324  ast_assert(session->websocket_apps == NULL);
325  ast_assert(AST_VECTOR_SIZE(&session->message_queue) == 0);
326 }
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
static struct ast_mansession session
A wrapper for the /ref ast_ari_websocket_session.
struct ao2_container * websocket_apps
struct ast_ari_websocket_session * ws_session
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ event_session_hash()

static int event_session_hash ( const void *  obj,
const int  flags 
)
static

AO2 hash function for event_session objects.

Computes hash value for the given event_session, with respect to the provided search flags.

Definition at line 191 of file resource_events.c.

References ast_assert, ast_str_hash(), OBJ_SEARCH_KEY, OBJ_SEARCH_MASK, OBJ_SEARCH_OBJECT, and session.

Referenced by ast_ari_websocket_events_event_websocket_init().

192 {
193  const struct event_session *session;
194  const char *key;
195 
196  switch (flags & OBJ_SEARCH_MASK) {
197  case OBJ_SEARCH_KEY:
198  key = obj;
199  break;
200  case OBJ_SEARCH_OBJECT:
201  session = obj;
202  key = session->session_id;
203  break;
204  default:
205  /* Hash can only work on something with a full key. */
206  ast_assert(0);
207  return 0;
208  }
209  return ast_str_hash(key);
210 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
#define ast_assert(a)
Definition: utils.h:695
static struct ast_mansession session
A wrapper for the /ref ast_ari_websocket_session.
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
Search option field mask.
Definition: astobj2.h:1076
static force_inline int attribute_pure ast_str_hash(const char *str)
Compute a hash value on a string.
Definition: strings.h:1206

◆ event_session_shutdown()

static void event_session_shutdown ( struct event_session session)
static

Explicitly shutdown a session.

An explicit shutdown is necessary, since the stasis_app has a reference to this session. We also need to be sure to null out the ws_session field, since the websocket is about to go away.

Definition at line 223 of file resource_events.c.

References ao2_cleanup, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, app, ast_json_unref(), AST_VECTOR_FREE, AST_VECTOR_GET, AST_VECTOR_SIZE, lock, NULL, SCOPED_AO2LOCK, stasis_app_unregister(), event_session::websocket_apps, and event_session::ws_session.

Referenced by event_session_cleanup().

224 {
225  struct ao2_iterator i;
226  char *app;
227  int j;
228  SCOPED_AO2LOCK(lock, session);
229 
230  /* Clean up the websocket_apps container */
231  if (session->websocket_apps) {
232  i = ao2_iterator_init(session->websocket_apps, 0);
233  while ((app = ao2_iterator_next(&i))) {
235  ao2_cleanup(app);
236  }
238  ao2_cleanup(session->websocket_apps);
239  session->websocket_apps = NULL;
240  }
241 
242  /* Clean up the message_queue container */
243  for (j = 0; j < AST_VECTOR_SIZE(&session->message_queue); j++) {
244  struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, j);
245  ast_json_unref(msg);
246  }
247  AST_VECTOR_FREE(&session->message_queue);
248 
249  /* Remove the handle to the underlying websocket session */
250  session->ws_session = NULL;
251 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define NULL
Definition: resample.c:96
void stasis_app_unregister(const char *app_name)
Unregister a Stasis application.
Definition: res_stasis.c:1787
ast_mutex_t lock
Definition: app_meetme.c:1091
struct ao2_container * websocket_apps
struct ast_ari_websocket_session * ws_session
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
Abstract JSON element (object, array, string, int, ...).
static const char app[]
Definition: app_mysql.c:62
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ event_session_shutdown_cb()

static int event_session_shutdown_cb ( void *  session,
void *  arg,
int  flags 
)
static

Definition at line 458 of file resource_events.c.

References event_session_cleanup().

Referenced by ast_ari_websocket_events_event_websocket_dtor().

459 {
461 
462  return 0;
463 }
static void event_session_cleanup(struct event_session *session)
Processes cleanup actions for a event_session object.
static struct ast_mansession session

◆ event_session_update_websocket()

static void event_session_update_websocket ( struct event_session session,
struct ast_ari_websocket_session ws_session 
)
static

Updates the websocket session for an event_session.

The websocket for the given event_session will be updated to the value of the ws_session argument.

If the value of the ws_session is not NULL and there are messages in the event session's message_queue, the messages are dispatched and removed from the queue.

Definition at line 269 of file resource_events.c.

References ao2_lock, ao2_unlock, ast_ari_websocket_session_write(), ast_assert, ast_json_unref(), AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_RESET, AST_VECTOR_SIZE, NULL, and event_session::ws_session.

Referenced by ast_ari_websocket_events_event_websocket_established().

271 {
272  int i;
273 
274  ast_assert(session != NULL);
275 
276  ao2_lock(session);
277 
278  session->ws_session = ws_session;
279 
280  for (i = 0; i < AST_VECTOR_SIZE(&session->message_queue); i++) {
281  struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, i);
283  ast_json_unref(msg);
284  }
285 
286  AST_VECTOR_RESET(&session->message_queue, AST_VECTOR_ELEM_CLEANUP_NOOP);
287  ao2_unlock(session);
288 }
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
#define NULL
Definition: resample.c:96
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:573
#define ao2_lock(a)
Definition: astobj2.h:718
struct ast_ari_websocket_session * ws_session
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:627
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
Abstract JSON element (object, array, string, int, ...).
int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session, struct ast_json *message)
Send a message to an ARI WebSocket.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ stasis_app_message_handler()

static void stasis_app_message_handler ( void *  data,
const char *  app_name,
struct ast_json message 
)
static

Callback handler for Stasis application messages.

Definition at line 79 of file resource_events.c.

References ao2_find, ao2_lock, ao2_unlock, ast_ari_json_format(), ast_ari_websocket_session_get_remote_addr(), ast_ari_websocket_session_write(), ast_assert, ast_json_dump_string_format(), ast_json_free(), ast_json_object_get(), ast_json_object_set(), ast_json_ref(), ast_json_string_create(), ast_json_string_get(), ast_log, ast_sockaddr_stringify(), AST_VECTOR_APPEND, ast_verbose(), LOG_WARNING, NULL, OBJ_NODATA, OBJ_UNLINK, S_OR, session, stasis_app_event_allowed(), stasis_app_get_debug_by_name(), str, event_session::websocket_apps, and event_session::ws_session.

Referenced by event_session_alloc().

81 {
82  struct event_session *session = data;
83  const char *msg_type, *msg_application;
84  int app_debug_enabled;
85 
86  ast_assert(session != NULL);
87 
88  /*
89  * We need to get the debug flag before lockinf session
90  * to help prevent a deadlock with the apps_registry container.
91  */
92  app_debug_enabled = stasis_app_get_debug_by_name(app_name);
93 
94  ao2_lock(session);
95 
96  msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
97  msg_application = S_OR(
98  ast_json_string_get(ast_json_object_get(message, "application")), "");
99 
100  /* If we've been replaced, remove the application from our local
101  websocket_apps container */
102  if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
103  strcmp(msg_application, app_name) == 0) {
104  ao2_find(session->websocket_apps, msg_application,
106  }
107 
108  /* Now, we need to determine our state to see how we will handle the message */
109  if (ast_json_object_set(message, "application", ast_json_string_create(app_name))) {
110  /* We failed to add an application element to our json message */
112  "Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
113  msg_type,
114  msg_application);
115  } else if (!session->ws_session) {
116  /* If the websocket is NULL, the message goes to the queue */
117  if (!AST_VECTOR_APPEND(&session->message_queue, message)) {
118  ast_json_ref(message);
119  }
121  "Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
122  msg_type,
123  msg_application);
124  } else if (stasis_app_event_allowed(app_name, message)) {
125  if (app_debug_enabled) {
127 
128  ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
130  str);
131  ast_json_free(str);
132  }
133 
134  /* We are ready to publish the message */
135  ast_ari_websocket_session_write(session->ws_session, message);
136  }
137 
138  ao2_unlock(session);
139 }
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
enum ast_json_encoding_format ast_ari_json_format(void)
Configured encoding format for JSON output.
Definition: res_ari.c:806
#define LOG_WARNING
Definition: logger.h:274
void ast_json_free(void *p)
Asterisk&#39;s custom JSON allocator. Exposed for use by unit tests.
Definition: json.c:52
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
int stasis_app_event_allowed(const char *app_name, struct ast_json *event)
Check if the given event should be filtered.
struct ast_sockaddr * ast_ari_websocket_session_get_remote_addr(struct ast_ari_websocket_session *session)
Get the remote address from an ARI WebSocket.
#define ast_assert(a)
Definition: utils.h:695
#define ao2_unlock(a)
Definition: astobj2.h:730
void ast_verbose(const char *fmt,...)
Definition: extconf.c:2207
const char * str
Definition: app_jack.c:147
#define NULL
Definition: resample.c:96
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:404
int stasis_app_get_debug_by_name(const char *app_name)
Get debug status of an application.
#define ast_log
Definition: astobj2.c:42
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
static struct ast_mansession session
A wrapper for the /ref ast_ari_websocket_session.
#define ao2_lock(a)
Definition: astobj2.h:718
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:273
struct ao2_container * websocket_apps
char * ast_json_dump_string_format(struct ast_json *root, enum ast_json_encoding_format format)
Encode a JSON value to a string.
Definition: json.c:463
struct ast_ari_websocket_session * ws_session
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
Definition: netsock2.h:260
const char * app_name(struct ast_app *app)
Definition: pbx_app.c:463
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
Definition: strings.h:79
int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session, struct ast_json *message)
Send a message to an ARI WebSocket.

Variable Documentation

◆ event_session_registry

struct ao2_container* event_session_registry
static

Local registry for created event_session objects.

Definition at line 68 of file resource_events.c.