Asterisk - The Open Source Telephony Project  18.5.0
pjsip_transport_management.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 #include "asterisk.h"
20 
21 #include <signal.h>
22 
23 #include <pjsip.h>
24 #include <pjsip_ua.h>
25 
26 #include "asterisk/res_pjsip.h"
27 #include "asterisk/module.h"
28 #include "asterisk/astobj2.h"
30 
31 /*! \brief Number of buckets for monitored transports */
32 #define TRANSPORTS_BUCKETS 127
33 
34 #define IDLE_TIMEOUT (pjsip_cfg()->tsx.td)
35 
36 /*! \brief The keep alive packet to send */
37 static const pj_str_t keepalive_packet = { "\r\n\r\n", 4 };
38 
39 /*! \brief Global container of active transports */
40 static AO2_GLOBAL_OBJ_STATIC(monitored_transports);
41 
42 /*! \brief Scheduler context for timing out connections with no data received */
43 static struct ast_sched_context *sched;
44 
45 /*! \brief Thread keeping things alive */
47 
48 /*! \brief The global interval at which to send keepalives */
49 static unsigned int keepalive_interval;
50 
51 /*! \brief Structure for transport to be monitored */
53  /*! \brief The underlying PJSIP transport */
54  pjsip_transport *transport;
55  /*! \brief Non-zero if a PJSIP request was received */
57 };
58 
60 {
61  pjsip_tpselector selector = {
62  .type = PJSIP_TPSELECTOR_TRANSPORT,
63  .u.transport = monitored->transport,
64  };
65 
66  pjsip_tpmgr_send_raw(pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()),
67  monitored->transport->key.type,
68  &selector,
69  NULL,
70  keepalive_packet.ptr,
71  keepalive_packet.slen,
72  &monitored->transport->key.rem_addr,
73  pj_sockaddr_get_len(&monitored->transport->key.rem_addr),
74  NULL, NULL);
75 }
76 
77 /*! \brief Thread which sends keepalives to all active connection-oriented transports */
78 static void *keepalive_transport_thread(void *data)
79 {
80  struct ao2_container *transports;
81  pj_thread_desc desc = { 0 };
82  pj_thread_t *thread;
83 
84  if (pj_thread_register("Asterisk Keepalive Thread", desc, &thread) != PJ_SUCCESS) {
85  ast_log(LOG_ERROR, "Could not register keepalive thread with PJLIB, keepalives will not occur.\n");
86  return NULL;
87  }
88 
89  transports = ao2_global_obj_ref(monitored_transports);
90  if (!transports) {
91  return NULL;
92  }
93 
94  /*
95  * Once loaded this module just keeps on going as it is unsafe to stop
96  * and change the underlying callback for the transport manager.
97  */
98  while (keepalive_interval) {
99  struct ao2_iterator iter;
100  struct monitored_transport *monitored;
101 
102  sleep(keepalive_interval);
103 
104  /*
105  * We must use the iterator to avoid deadlock between the container lock
106  * and the pjproject transport manager group lock when sending
107  * the keepalive packet.
108  */
109  iter = ao2_iterator_init(transports, 0);
110  for (; (monitored = ao2_iterator_next(&iter)); ao2_ref(monitored, -1)) {
112  }
113  ao2_iterator_destroy(&iter);
114  }
115 
116  ao2_ref(transports, -1);
117  return NULL;
118 }
119 
121 
123 {
124  if (!pj_thread_is_registered()) {
125  pj_thread_t *thread;
126  pj_thread_desc *desc;
127 
128  desc = ast_threadstorage_get(&desc_storage, sizeof(pj_thread_desc));
129  if (!desc) {
130  ast_log(LOG_ERROR, "Could not get thread desc from thread-local storage.\n");
131  return -1;
132  }
133 
134  pj_bzero(*desc, sizeof(*desc));
135 
136  pj_thread_register("Transport Monitor", *desc, &thread);
137  }
138 
139  return 0;
140 }
141 
142 static struct monitored_transport *get_monitored_transport_by_name(const char *obj_name)
143 {
144  struct ao2_container *transports;
145  struct monitored_transport *monitored = NULL;
146 
147  transports = ao2_global_obj_ref(monitored_transports);
148  if (transports) {
149  monitored = ao2_find(transports, obj_name, OBJ_SEARCH_KEY);
150  }
151  ao2_cleanup(transports);
152 
153  /* Caller is responsible for cleaning up reference */
154  return monitored;
155 }
156 
157 static int idle_sched_cb(const void *data)
158 {
159  char *obj_name = (char *) data;
160  struct monitored_transport *monitored;
161 
163  ast_free(obj_name);
164  return 0;
165  }
166 
167  monitored = get_monitored_transport_by_name(obj_name);
168  if (monitored) {
169  if (!monitored->sip_received) {
170  ast_log(LOG_NOTICE, "Shutting down transport '%s' since no request was received in %d seconds\n",
171  monitored->transport->info, IDLE_TIMEOUT / 1000);
172  pjsip_transport_shutdown(monitored->transport);
173  }
174  ao2_ref(monitored, -1);
175  }
176 
177  ast_free(obj_name);
178  return 0;
179 }
180 
181 static int idle_sched_cleanup(const void *data)
182 {
183  char *obj_name = (char *) data;
184  struct monitored_transport *monitored;
185 
187  ast_free(obj_name);
188  return 0;
189  }
190 
191  monitored = get_monitored_transport_by_name(obj_name);
192  if (monitored) {
193  pjsip_transport_shutdown(monitored->transport);
194  ao2_ref(monitored, -1);
195  }
196 
197  ast_free(obj_name);
198  return 0;
199 }
200 
201 /*! \brief Destructor for keepalive transport */
202 static void monitored_transport_destroy(void *obj)
203 {
204  struct monitored_transport *monitored = obj;
205 
206  pjsip_transport_dec_ref(monitored->transport);
207 }
208 
209 /*! \brief Callback invoked when transport changes occur */
210 static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state,
211  const pjsip_transport_state_info *info)
212 {
213  struct ao2_container *transports;
214 
215  /* We only care about reliable transports */
216  if (PJSIP_TRANSPORT_IS_RELIABLE(transport)
217  && (transport->dir == PJSIP_TP_DIR_INCOMING || keepalive_interval)
218  && (transports = ao2_global_obj_ref(monitored_transports))) {
219  struct monitored_transport *monitored;
220 
221  switch (state) {
222  case PJSIP_TP_STATE_CONNECTED:
223  monitored = ao2_alloc_options(sizeof(*monitored),
225  if (!monitored) {
226  break;
227  }
228  monitored->transport = transport;
229  pjsip_transport_add_ref(monitored->transport);
230 
231  ao2_link(transports, monitored);
232 
233  if (transport->dir == PJSIP_TP_DIR_INCOMING) {
234  char *obj_name = ast_strdup(transport->obj_name);
235 
236  if (!obj_name
237  || ast_sched_add_variable(sched, IDLE_TIMEOUT, idle_sched_cb, obj_name, 1) < 0) {
238  /* Shut down the transport if anything fails */
239  pjsip_transport_shutdown(transport);
240  ast_free(obj_name);
241  }
242  }
243  ao2_ref(monitored, -1);
244  break;
245  case PJSIP_TP_STATE_SHUTDOWN:
246  case PJSIP_TP_STATE_DISCONNECTED:
247  ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
248  break;
249  default:
250  break;
251  }
252 
253  ao2_ref(transports, -1);
254  }
255 }
256 
259 };
260 
261 /*! \brief Hashing function for monitored transport */
262 static int monitored_transport_hash_fn(const void *obj, int flags)
263 {
264  const struct monitored_transport *object;
265  const char *key;
266 
267  switch (flags & OBJ_SEARCH_MASK) {
268  case OBJ_SEARCH_KEY:
269  key = obj;
270  break;
271  case OBJ_SEARCH_OBJECT:
272  object = obj;
273  key = object->transport->obj_name;
274  break;
275  default:
276  /* Hash can only work on something with a full key. */
277  ast_assert(0);
278  return 0;
279  }
280  return ast_str_hash(key);
281 }
282 
283 /*! \brief Comparison function for monitored transport */
284 static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
285 {
286  const struct monitored_transport *object_left = obj;
287  const struct monitored_transport *object_right = arg;
288  const char *right_key = arg;
289  int cmp;
290 
291  switch (flags & OBJ_SEARCH_MASK) {
292  case OBJ_SEARCH_OBJECT:
293  right_key = object_right->transport->obj_name;
294  /* Fall through */
295  case OBJ_SEARCH_KEY:
296  cmp = strcmp(object_left->transport->obj_name, right_key);
297  break;
299  /*
300  * We could also use a partial key struct containing a length
301  * so strlen() does not get called for every comparison instead.
302  */
303  cmp = strncmp(object_left->transport->obj_name, right_key, strlen(right_key));
304  break;
305  default:
306  /*
307  * What arg points to is specific to this traversal callback
308  * and has no special meaning to astobj2.
309  */
310  cmp = 0;
311  break;
312  }
313 
314  return !cmp ? CMP_MATCH : 0;
315 }
316 
317 static void keepalive_global_loaded(const char *object_type)
318 {
319  unsigned int new_interval = ast_sip_get_keep_alive_interval();
320 
321  if (new_interval) {
322  keepalive_interval = new_interval;
323  } else if (keepalive_interval) {
324  ast_log(LOG_NOTICE, "Keepalive support can not be disabled once activated.\n");
325  return;
326  } else {
327  /* This will occur if no keepalive interval has been specified at initial start */
328  return;
329  }
330 
332  return;
333  }
334 
336  ast_log(LOG_ERROR, "Could not create thread for sending keepalive messages.\n");
338  keepalive_interval = 0;
339  }
340 }
341 
342 /*! \brief Observer which is used to update our interval when the global setting changes */
345 };
346 
347 /*!
348  * \brief
349  * On incoming TCP connections, when we receive a SIP request, we mark that we have
350  * received a valid SIP request. This way, we will not shut the transport down for
351  * idleness
352  */
353 static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
354 {
355  struct monitored_transport *idle_trans;
356 
357  idle_trans = get_monitored_transport_by_name(rdata->tp_info.transport->obj_name);
358  if (idle_trans) {
359  idle_trans->sip_received = 1;
360  ao2_ref(idle_trans, -1);
361  }
362 
363  return PJ_FALSE;
364 }
365 
366 static pjsip_module idle_monitor_module = {
367  .name = {"idle monitor module", 19},
368  .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER + 3,
369  .on_rx_request = idle_monitor_on_rx_request,
370 };
371 
373 {
374  struct ao2_container *transports;
375 
378  if (!transports) {
379  ast_log(LOG_ERROR, "Could not create container for transports to perform keepalive on.\n");
381  }
382  ao2_global_obj_replace_unref(monitored_transports, transports);
383  ao2_ref(transports, -1);
384 
385  sched = ast_sched_context_create();
386  if (!sched) {
387  ast_log(LOG_ERROR, "Failed to create keepalive scheduler context.\n");
388  ao2_global_obj_release(monitored_transports);
390  }
391 
392  if (ast_sched_start_thread(sched)) {
393  ast_log(LOG_ERROR, "Failed to start keepalive scheduler thread\n");
395  sched = NULL;
396  ao2_global_obj_release(monitored_transports);
398  }
399 
401 
402  ast_sip_transport_state_register(&monitored_transport_reg);
403 
404  ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
406 
408 }
409 
411 {
412  if (keepalive_interval) {
413  keepalive_interval = 0;
415  pthread_kill(keepalive_thread, SIGURG);
416  pthread_join(keepalive_thread, NULL);
418  }
419  }
420 
421  ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
422 
423  ast_sip_transport_state_unregister(&monitored_transport_reg);
424 
426 
429  sched = NULL;
430 
431  ao2_global_obj_release(monitored_transports);
432 }
int ast_sched_start_thread(struct ast_sched_context *con)
Start a thread for processing scheduler entries.
Definition: sched.c:195
#define AST_THREADSTORAGE(name)
Define a thread storage variable.
Definition: threadstorage.h:84
pthread_t thread
Definition: app_meetme.c:1089
void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
Clean all scheduled events with matching callback.
Definition: sched.c:407
static struct monitored_transport * get_monitored_transport_by_name(const char *obj_name)
Asterisk main include file. File version handling, generic pbx functions.
void * ast_threadstorage_get(struct ast_threadstorage *ts, size_t init_size)
Retrieve thread storage.
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1105
int ast_sip_initialize_transport_management(void)
pjsip_transport * transport
The underlying PJSIP transport.
int sip_received
Non-zero if a PJSIP request was received.
void ast_sip_transport_state_register(struct ast_sip_tpmgr_state_callback *element)
Register a transport state notification callback element.
static const char desc[]
Definition: cdr_mysql.c:73
static unsigned int keepalive_interval
The global interval at which to send keepalives.
int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable) attribute_warn_unused_result
Adds a scheduled event with rescheduling support.
Definition: sched.c:524
#define ao2_global_obj_ref(holder)
Definition: astobj2.h:925
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
static pjsip_module idle_monitor_module
static pthread_t keepalive_thread
Thread keeping things alive.
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:695
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:243
#define NULL
Definition: resample.c:96
static struct ast_sched_context * sched
Scheduler context for timing out connections with no data received.
#define TRANSPORTS_BUCKETS
Number of buckets for monitored transports.
static int idle_sched_cb(const void *data)
static void monitored_transport_destroy(void *obj)
Destructor for keepalive transport.
#define ast_log
Definition: astobj2.c:42
static AO2_GLOBAL_OBJ_STATIC(monitored_transports)
Global container of active transports.
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1120
static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
Comparison function for monitored transport.
static int idle_sched_cleanup(const void *data)
#define AST_PTHREADT_NULL
Definition: lock.h:66
#define ao2_ref(o, delta)
Definition: astobj2.h:464
void ast_sip_destroy_transport_management(void)
int ast_sip_register_service(pjsip_module *module)
Register a SIP service in Asterisk.
Definition: res_pjsip.c:3315
int ast_sorcery_observer_add(const struct ast_sorcery *sorcery, const char *type, const struct ast_sorcery_observer *callbacks)
Add an observer to a specific object type.
Definition: sorcery.c:2386
struct ast_sched_context * ast_sched_context_create(void)
Create a scheduler context.
Definition: sched.c:236
static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state, const pjsip_transport_state_info *info)
Callback invoked when transport changes occur.
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
Interface for a sorcery object type observer.
Definition: sorcery.h:332
#define ao2_global_obj_release(holder)
Definition: astobj2.h:865
#define IDLE_TIMEOUT
def info(msg)
#define ao2_iterator_next(iter)
Definition: astobj2.h:1933
#define LOG_NOTICE
Definition: logger.h:263
#define ast_free(a)
Definition: astmm.h:182
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:559
static int idle_sched_init_pj_thread(void)
static const pj_str_t keepalive_packet
The keep alive packet to send.
Module has failed to load, may be in an inconsistent state.
Definition: module.h:78
#define ao2_find(container, arg, flags)
Definition: astobj2.h:1756
void(* loaded)(const char *object_type)
Callback for when an object type is loaded/reloaded.
Definition: sorcery.h:343
static struct ast_threadstorage desc_storage
pjsip_endpoint * ast_sip_get_pjsip_endpoint(void)
Get a pointer to the PJSIP endpoint.
Definition: res_pjsip.c:3718
struct ast_sip_tpmgr_state_callback monitored_transport_reg
#define ao2_global_obj_replace_unref(holder, obj)
Definition: astobj2.h:908
The arg parameter is an object of the same type.
Definition: astobj2.h:1091
void ast_sorcery_observer_remove(const struct ast_sorcery *sorcery, const char *type, const struct ast_sorcery_observer *callbacks)
Remove an observer from a specific object type.
Definition: sorcery.c:2418
unsigned int ast_sip_get_keep_alive_interval(void)
Retrieve the system keep alive interval setting.
struct ast_sorcery * ast_sip_get_sorcery(void)
Get a pointer to the SIP sorcery structure.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1841
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
void ast_sip_unregister_service(pjsip_module *module)
Definition: res_pjsip.c:3331
static int monitored_transport_hash_fn(const void *obj, int flags)
Hashing function for monitored transport.
Structure for transport to be monitored.
void ast_sip_transport_state_unregister(struct ast_sip_tpmgr_state_callback *element)
Unregister a transport state notification callback element.
static void keepalive_transport_send_keepalive(struct monitored_transport *monitored)
Generic container type.
static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
On incoming TCP connections, when we receive a SIP request, we mark that we have received a valid SIP...
Search option field mask.
Definition: astobj2.h:1076
static void * keepalive_transport_thread(void *data)
Thread which sends keepalives to all active connection-oriented transports.
Asterisk module definitions.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
void ast_sched_context_destroy(struct ast_sched_context *c)
destroys a schedule context
Definition: sched.c:269
static void keepalive_global_loaded(const char *object_type)
void ast_sorcery_reload_object(const struct ast_sorcery *sorcery, const char *type)
Inform any wizards of a specific object type to reload persistent objects.
Definition: sorcery.c:1442
static struct ast_sorcery_observer keepalive_global_observer
Observer which is used to update our interval when the global setting changes.
static force_inline int attribute_pure ast_str_hash(const char *str)
Compute a hash value on a string.
Definition: strings.h:1206
#define ao2_link(container, obj)
Definition: astobj2.h:1549