Asterisk - The Open Source Telephony Project  18.5.0
stasis_endpoints.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis endpoint API.
22  *
23  * \author David M. Lee, II <[email protected]>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis.h"
35 
36 /*** DOCUMENTATION
37  <managerEvent language="en_US" name="PeerStatus">
38  <managerEventInstance class="EVENT_FLAG_SYSTEM">
39  <synopsis>Raised when the state of a peer changes.</synopsis>
40  <syntax>
41  <parameter name="ChannelType">
42  <para>The channel technology of the peer.</para>
43  </parameter>
44  <parameter name="Peer">
45  <para>The name of the peer (including channel technology).</para>
46  </parameter>
47  <parameter name="PeerStatus">
48  <para>New status of the peer.</para>
49  <enumlist>
50  <enum name="Unknown"/>
51  <enum name="Registered"/>
52  <enum name="Unregistered"/>
53  <enum name="Rejected"/>
54  <enum name="Lagged"/>
55  </enumlist>
56  </parameter>
57  <parameter name="Cause">
58  <para>The reason the status has changed.</para>
59  </parameter>
60  <parameter name="Address">
61  <para>New address of the peer.</para>
62  </parameter>
63  <parameter name="Port">
64  <para>New port for the peer.</para>
65  </parameter>
66  <parameter name="Time">
67  <para>Time it takes to reach the peer and receive a response.</para>
68  </parameter>
69  </syntax>
70  </managerEventInstance>
71  </managerEvent>
72  <managerEvent language="en_US" name="ContactStatus">
73  <managerEventInstance class="EVENT_FLAG_SYSTEM">
74  <synopsis>Raised when the state of a contact changes.</synopsis>
75  <syntax>
76  <parameter name="URI">
77  <para>This contact's URI.</para>
78  </parameter>
79  <parameter name="ContactStatus">
80  <para>New status of the contact.</para>
81  <enumlist>
82  <enum name="Unknown"/>
83  <enum name="Unreachable"/>
84  <enum name="Reachable"/>
85  <enum name="Unqualified"/>
86  <enum name="Removed"/>
87  <enum name="Updated"/>
88  </enumlist>
89  </parameter>
90  <parameter name="AOR">
91  <para>The name of the associated aor.</para>
92  </parameter>
93  <parameter name="EndpointName">
94  <para>The name of the associated endpoint.</para>
95  </parameter>
96  <parameter name="RoundtripUsec">
97  <para>The RTT measured during the last qualify.</para>
98  </parameter>
99  </syntax>
100  </managerEventInstance>
101  </managerEvent>
102 ***/
103 
105 
107 {
108  return endpoint_cache_all;
109 }
110 
112 {
113  return stasis_cp_all_cache(endpoint_cache_all);
114 }
115 
117 {
118  return stasis_cp_all_topic(endpoint_cache_all);
119 }
120 
122 {
123  return stasis_cp_all_topic_cached(endpoint_cache_all);
124 }
125 
127 
129 {
130  struct ast_endpoint_blob *obj = stasis_message_data(msg);
131  RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
132  const char *value;
133 
134  /* peer_status is the only *required* thing */
135  if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
136  return NULL;
137  }
138  ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
139 
140  if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
141  ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
142  }
143  if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
144  ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
145  }
146  if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
147  ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
148  }
149  if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
150  ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
151  }
152 
154  "ChannelType: %s\r\n"
155  "Peer: %s/%s\r\n"
156  "%s",
157  obj->snapshot->tech,
158  obj->snapshot->tech,
159  obj->snapshot->resource,
160  ast_str_buffer(peerstatus_event_string));
161 }
162 
163 static struct ast_json *peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
164 {
165  struct ast_endpoint_blob *obj = stasis_message_data(msg);
166  struct ast_json *json_endpoint;
167  struct ast_json *json_peer;
168  struct ast_json *json_final;
169  const struct timeval *tv = stasis_message_timestamp(msg);
170 
171  json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
172  if (!json_endpoint) {
173  return NULL;
174  }
175 
176  json_peer = ast_json_object_create();
177  if (!json_peer) {
178  ast_json_unref(json_endpoint);
179  return NULL;
180  }
181 
182  /* Copy all fields from the blob */
183  ast_json_object_update(json_peer, obj->blob);
184 
185  json_final = ast_json_pack("{s: s, s: o, s: o, s: o }",
186  "type", "PeerStatusChange",
187  "timestamp", ast_json_timeval(*tv, NULL),
188  "endpoint", json_endpoint,
189  "peer", json_peer);
190  if (!json_final) {
191  ast_json_unref(json_endpoint);
192  ast_json_unref(json_peer);
193  }
194 
195  return json_final;
196 }
197 
200  .to_json = peerstatus_to_json,
201 );
202 
204 {
205  struct ast_endpoint_blob *obj = stasis_message_data(msg);
206  RAII_VAR(struct ast_str *, contactstatus_event_string, ast_str_create(64), ast_free);
207  const char *value;
208 
209  if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "uri")))) {
210  return NULL;
211  }
212  ast_str_append(&contactstatus_event_string, 0, "URI: %s\r\n", value);
213 
214  if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "contact_status")))) {
215  return NULL;
216  }
217  ast_str_append(&contactstatus_event_string, 0, "ContactStatus: %s\r\n", value);
218 
219  if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "aor")))) {
220  return NULL;
221  }
222  ast_str_append(&contactstatus_event_string, 0, "AOR: %s\r\n", value);
223 
224  if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "endpoint_name")))) {
225  return NULL;
226  }
227  ast_str_append(&contactstatus_event_string, 0, "EndpointName: %s\r\n", value);
228 
229  if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec")))) {
230  ast_str_append(&contactstatus_event_string, 0, "RoundtripUsec: %s\r\n", value);
231  }
232 
233  return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "ContactStatus",
234  "%s", ast_str_buffer(contactstatus_event_string));
235 }
236 
237 static struct ast_json *contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
238 {
239  struct ast_endpoint_blob *obj = stasis_message_data(msg);
240  struct ast_json *json_endpoint;
241  struct ast_json *json_final;
242  const char *rtt;
243  const struct timeval *tv = stasis_message_timestamp(msg);
244 
245  json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
246  if (!json_endpoint) {
247  return NULL;
248  }
249 
250  /* The roundtrip time is optional. */
251  rtt = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec"));
252  if (!ast_strlen_zero(rtt)) {
253  json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s, s: s } } ",
254  "type", "ContactStatusChange",
255  "timestamp", ast_json_timeval(*tv, NULL),
256  "endpoint", json_endpoint,
257  "contact_info",
258  "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
259  "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
260  "contact_status")),
261  "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")),
262  "roundtrip_usec", rtt);
263  } else {
264  json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s } } ",
265  "type", "ContactStatusChange",
266  "timestamp", ast_json_timeval(*tv, NULL),
267  "endpoint", json_endpoint,
268  "contact_info",
269  "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
270  "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
271  "contact_status")),
272  "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")));
273  }
274  if (!json_final) {
275  ast_json_unref(json_endpoint);
276  }
277 
278  return json_final;
279 }
280 
283  .to_json = contactstatus_to_json
284 );
285 
286 static void endpoint_blob_dtor(void *obj)
287 {
288  struct ast_endpoint_blob *event = obj;
289  ao2_cleanup(event->snapshot);
290  ast_json_unref(event->blob);
291 }
292 
294  struct stasis_message_type *type, struct ast_json *blob)
295 {
296  struct ast_endpoint_blob *obj;
297  struct stasis_message *msg;
298 
299  if (!type) {
300  return NULL;
301  }
302  if (!blob) {
303  blob = ast_json_null();
304  }
305 
306  if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
307  return NULL;
308  }
309 
310  if (endpoint) {
311  if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
312  ao2_ref(obj, -1);
313 
314  return NULL;
315  }
316  }
317 
318  obj->blob = ast_json_ref(blob);
319  msg = stasis_message_create(type, obj);
320  ao2_ref(obj, -1);
321 
322  return msg;
323 }
324 
326  struct ast_json *blob)
327 {
328  struct stasis_message *message;
329 
330  if (!blob) {
331  return;
332  }
333 
334  message = ast_endpoint_blob_create(endpoint, type, blob);
335  if (message) {
336  stasis_publish(ast_endpoint_topic(endpoint), message);
337  ao2_ref(message, -1);
338  }
339 }
340 
342  const char *name)
343 {
344  char *id = NULL;
345  struct stasis_message *msg;
346  struct ast_endpoint_snapshot *snapshot;
347 
348  if (ast_strlen_zero(name)) {
349  ast_asprintf(&id, "%s", tech);
350  } else {
351  ast_asprintf(&id, "%s/%s", tech, name);
352  }
353  if (!id) {
354  return NULL;
355  }
356  ast_tech_to_upper(id);
357 
359  ast_free(id);
360  if (!msg) {
361  return NULL;
362  }
363 
364  snapshot = stasis_message_data(msg);
365  ast_assert(snapshot != NULL);
366 
367  ao2_ref(snapshot, +1);
368  ao2_ref(msg, -1);
369 
370  return snapshot;
371 }
372 
373 /*!
374  * \brief Callback extract a unique identity from a snapshot message.
375  *
376  * This identity is unique to the underlying object of the snapshot, such as the
377  * UniqueId field of a channel.
378  *
379  * \param message Message to extract id from.
380  * \return String representing the snapshot's id.
381  * \return \c NULL if the message_type of the message isn't a handled snapshot.
382  * \since 12
383  */
385 {
386  struct ast_endpoint_snapshot *snapshot;
387 
389  return NULL;
390  }
391 
392  snapshot = stasis_message_data(message);
393 
394  return snapshot->id;
395 }
396 
397 
399  const struct ast_endpoint_snapshot *snapshot,
400  const struct stasis_message_sanitizer *sanitize)
401 {
402  struct ast_json *json;
403  struct ast_json *channel_array;
404  int i;
405 
406  json = ast_json_pack("{s: s, s: s, s: s, s: []}",
407  "technology", snapshot->tech,
408  "resource", snapshot->resource,
409  "state", ast_endpoint_state_to_string(snapshot->state),
410  "channel_ids");
411 
412  if (json == NULL) {
413  return NULL;
414  }
415 
416  if (snapshot->max_channels != -1) {
417  int res = ast_json_object_set(json, "max_channels",
419  if (res != 0) {
420  ast_json_unref(json);
421 
422  return NULL;
423  }
424  }
425 
426  channel_array = ast_json_object_get(json, "channel_ids");
427  ast_assert(channel_array != NULL);
428  for (i = 0; i < snapshot->num_channels; ++i) {
429  int res;
430 
431  if (sanitize && sanitize->channel_id
432  && sanitize->channel_id(snapshot->channel_ids[i])) {
433  continue;
434  }
435 
436  res = ast_json_array_append(channel_array,
437  ast_json_string_create(snapshot->channel_ids[i]));
438  if (res != 0) {
439  ast_json_unref(json);
440 
441  return NULL;
442  }
443  }
444 
445  return json;
446 }
447 
448 static void endpoints_stasis_cleanup(void)
449 {
453 
454  ao2_cleanup(endpoint_cache_all);
455  endpoint_cache_all = NULL;
456 }
457 
459 {
460  int res = 0;
462 
463  endpoint_cache_all = stasis_cp_all_create("endpoint:all",
465  if (!endpoint_cache_all) {
466  return -1;
467  }
468 
472 
473  return res;
474 }
static const char type[]
Definition: chan_ooh323.c:109
Struct containing info for an AMI event to send out.
Definition: manager.h:491
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_topic * stasis_cp_all_topic_cached(struct stasis_cp_all *all)
Get the caching topic.
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:591
struct ast_endpoint_snapshot * ast_endpoint_snapshot_create(struct ast_endpoint *endpoint)
Create a snapshot of an endpoint.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1501
int(* channel_id)(const char *channel_id)
Callback which determines whether a channel should be sanitized from a message based on the channel&#39;s...
Definition: stasis.h:210
struct stasis_cp_all * ast_endpoint_cache_all(void)
int ast_json_object_update(struct ast_json *object, struct ast_json *other)
Update object with all of the fields of other.
Definition: json.c:416
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:714
struct stasis_message * ast_endpoint_blob_create(struct ast_endpoint *endpoint, struct stasis_message_type *type, struct ast_json *blob)
Creates a ast_endpoint_blob message.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
struct ast_endpoint_snapshot * ast_endpoint_latest_snapshot(const char *tech, const char *name)
Retrieve the most recent snapshot for the endpoint with the given name.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1523
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
Definition: astman.c:222
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1091
#define ast_assert(a)
Definition: utils.h:695
const ast_string_field id
#define NULL
Definition: resample.c:96
int value
Definition: syslog.c:37
static const char * endpoint_snapshot_get_id(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type)
Blob of data associated with an endpoint.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:9727
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
static struct ast_json * peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:404
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:269
#define ast_strlen_zero(foo)
Definition: strings.h:52
struct ast_json * ast_json_null(void)
Get the JSON null value.
Definition: json.c:248
static struct ast_manager_event_blob * contactstatus_to_ami(struct stasis_message *msg)
#define EVENT_FLAG_SYSTEM
Definition: manager.h:71
struct stasis_cache * stasis_cp_all_cache(struct stasis_cp_all *all)
Get the cache.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:268
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
const ast_string_field resource
#define ao2_ref(o, delta)
Definition: astobj2.h:464
int ast_endpoint_stasis_init(void)
Initialization function for endpoint stasis support.
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:273
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:649
A snapshot of an endpoint&#39;s state.
int ast_json_array_append(struct ast_json *array, struct ast_json *value)
Append to an array.
Definition: json.c:368
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
char * ast_tech_to_upper(char *dev_str)
Convert the tech portion of a device string to upper case.
Definition: strings.h:1183
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
Definition: strings.h:584
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
const ast_string_field tech
struct stasis_message_type * ast_endpoint_state_type(void)
Message type for endpoint state changes.
struct ast_json * blob
static struct stasis_cp_all * endpoint_cache_all
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
static void endpoints_stasis_cleanup(void)
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic&#39;s subscribers.
Definition: stasis.c:1511
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
static void to_ami(struct ast_sip_subscription *sub, struct ast_str **buf)
struct stasis_topic * ast_endpoint_topic_all(void)
Topic for all endpoint releated messages.
static struct ast_manager_event_blob * peerstatus_to_ami(struct stasis_message *msg)
const char * ast_endpoint_state_to_string(enum ast_endpoint_state state)
Returns a string representation of the given endpoint state.
void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type, struct ast_json *blob)
Creates and publishes a ast_endpoint_blob message.
struct ast_endpoint_snapshot * snapshot
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:389
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:397
struct stasis_message_type * ast_endpoint_contact_state_type(void)
Message type for endpoint contact state changes.
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Definition: stasis_cache.c:686
Abstract JSON element (object, array, string, int, ...).
static struct ast_json * contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
struct stasis_topic * stasis_cp_all_topic(struct stasis_cp_all *all)
Get the aggregate topic.
Endpoint abstractions.
static void endpoint_blob_dtor(void *obj)
enum ast_endpoint_state state
struct stasis_cp_all * stasis_cp_all_create(const char *name, snapshot_get_id id_fn)
Create an all instance of the cache pattern.
#define ast_str_create(init_len)
Create a malloc&#39;ed dynamic length string.
Definition: strings.h:620
struct ast_json * ast_json_integer_create(intmax_t value)
Create a JSON integer.
Definition: json.c:317
struct stasis_cache * ast_endpoint_cache(void)
Backend cache for ast_endpoint_topic_all_cached().