Asterisk - The Open Source Telephony Project  18.5.0
threadpool.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <[email protected]>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 
20 #include "asterisk.h"
21 
22 #include "asterisk/threadpool.h"
23 #include "asterisk/taskprocessor.h"
24 #include "asterisk/astobj2.h"
25 #include "asterisk/utils.h"
26 
27 /* Needs to stay prime if increased */
28 #define THREAD_BUCKETS 89
29 
30 /*!
31  * \brief An opaque threadpool structure
32  *
33  * A threadpool is a collection of threads that execute
34  * tasks from a common queue.
35  */
37  /*! Threadpool listener */
39  /*!
40  * \brief The container of active threads.
41  * Active threads are those that are currently running tasks
42  */
44  /*!
45  * \brief The container of idle threads.
46  * Idle threads are those that are currenly waiting to run tasks
47  */
49  /*!
50  * \brief The container of zombie threads.
51  * Zombie threads may be running tasks, but they are scheduled to die soon
52  */
54  /*!
55  * \brief The main taskprocessor
56  *
57  * Tasks that are queued in this taskprocessor are
58  * doled out to the worker threads. Worker threads that
59  * execute tasks from the threadpool are executing tasks
60  * in this taskprocessor.
61  *
62  * The threadpool itself is actually the private data for
63  * this taskprocessor's listener. This way, as taskprocessor
64  * changes occur, the threadpool can alert its listeners
65  * appropriately.
66  */
68  /*!
69  * \brief The control taskprocessor
70  *
71  * This is a standard taskprocessor that uses the default
72  * taskprocessor listener. In other words, all tasks queued to
73  * this taskprocessor have a single thread that executes the
74  * tasks.
75  *
76  * All tasks that modify the state of the threadpool and all tasks
77  * that call out to threadpool listeners are pushed to this
78  * taskprocessor.
79  *
80  * For instance, when the threadpool changes sizes, a task is put
81  * into this taskprocessor to do so. When it comes time to tell the
82  * threadpool listener that worker threads have changed state,
83  * the task is placed in this taskprocessor.
84  *
85  * This is done for three main reasons
86  * 1) It ensures that listeners are given an accurate portrayal
87  * of the threadpool's current state. In other words, when a listener
88  * gets told a count of active, idle and zombie threads, it does not
89  * need to worry that internal state of the threadpool might be different
90  * from what it has been told.
91  * 2) It minimizes the locking required in both the threadpool and in
92  * threadpool listener's callbacks.
93  * 3) It ensures that listener callbacks are called in the same order
94  * that the threadpool had its state change.
95  */
97  /*! True if the threadpool is in the process of shutting down */
99  /*! Threadpool-specific options */
101 };
102 
103 /*!
104  * \brief listener for a threadpool
105  *
106  * The listener is notified of changes in a threadpool. It can
107  * react by doing things like increasing the number of threads
108  * in the pool
109  */
111  /*! Callbacks called by the threadpool */
113  /*! User data for the listener */
114  void *user_data;
115 };
116 
117 /*!
118  * \brief states for worker threads
119  */
121  /*! The worker is either active or idle */
123  /*!
124  * The worker has been asked to shut down but
125  * may still be in the process of executing tasks.
126  * This transition happens when the threadpool needs
127  * to shrink and needs to kill active threads in order
128  * to do so.
129  */
131  /*!
132  * The worker has been asked to shut down. Typically
133  * only idle threads go to this state directly, but
134  * active threads may go straight to this state when
135  * the threadpool is shut down.
136  */
138 };
139 
140 /*!
141  * A thread that executes threadpool tasks
142  */
144  /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
145  int id;
146  /*! Condition used in conjunction with state changes */
148  /*! Lock used alongside the condition for state changes */
150  /*! The actual thread that is executing tasks */
151  pthread_t thread;
152  /*! A pointer to the threadpool. Needed to be able to execute tasks */
154  /*! The current state of the worker thread */
156  /*! A boolean used to determine if an idle thread should become active */
157  int wake_up;
158  /*! Options for this threadpool */
159  struct ast_threadpool_options options;
160 };
161 
162 /* Worker thread forward declarations. See definitions for documentation */
163 static int worker_thread_hash(const void *obj, int flags);
164 static int worker_thread_cmp(void *obj, void *arg, int flags);
165 static void worker_thread_destroy(void *obj);
166 static void worker_active(struct worker_thread *worker);
167 static void *worker_start(void *arg);
168 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
169 static int worker_thread_start(struct worker_thread *worker);
170 static int worker_idle(struct worker_thread *worker);
171 static int worker_set_state(struct worker_thread *worker, enum worker_state state);
172 static void worker_shutdown(struct worker_thread *worker);
173 
174 /*!
175  * \brief Notify the threadpool listener that the state has changed.
176  *
177  * This notifies the threadpool listener via its state_changed callback.
178  * \param pool The threadpool whose state has changed
179  */
181 {
182  int active_size = ao2_container_count(pool->active_threads);
183  int idle_size = ao2_container_count(pool->idle_threads);
184 
185  if (pool->listener && pool->listener->callbacks->state_changed) {
186  pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
187  }
188 }
189 
190 /*!
191  * \brief Struct used for queued operations involving worker state changes
192  */
194  /*! Threadpool that contains the worker whose state has changed */
196  /*! Worker whose state has changed */
198 };
199 
200 /*!
201  * \brief Destructor for thread_worker_pair
202  */
204 {
205  ao2_ref(pair->worker, -1);
206  ast_free(pair);
207 }
208 
209 /*!
210  * \brief Allocate and initialize a thread_worker_pair
211  * \param pool Threadpool to assign to the thread_worker_pair
212  * \param worker Worker thread to assign to the thread_worker_pair
213  */
215  struct worker_thread *worker)
216 {
217  struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
218  if (!pair) {
219  return NULL;
220  }
221  pair->pool = pool;
222  ao2_ref(worker, +1);
223  pair->worker = worker;
224 
225  return pair;
226 }
227 
228 /*!
229  * \brief Move a worker thread from the active container to the idle container.
230  *
231  * This function is called from the threadpool's control taskprocessor thread.
232  * \param data A thread_worker_pair containing the threadpool and the worker to move.
233  * \return 0
234  */
235 static int queued_active_thread_idle(void *data)
236 {
237  struct thread_worker_pair *pair = data;
238 
239  ao2_link(pair->pool->idle_threads, pair->worker);
240  ao2_unlink(pair->pool->active_threads, pair->worker);
241 
243 
245  return 0;
246 }
247 
248 /*!
249  * \brief Queue a task to move a thread from the active list to the idle list
250  *
251  * This is called by a worker thread when it runs out of tasks to perform and
252  * goes idle.
253  * \param pool The threadpool to which the worker belongs
254  * \param worker The worker thread that has gone idle
255  */
257  struct worker_thread *worker)
258 {
259  struct thread_worker_pair *pair;
260  SCOPED_AO2LOCK(lock, pool);
261 
262  if (pool->shutting_down) {
263  return;
264  }
265 
266  pair = thread_worker_pair_alloc(pool, worker);
267  if (!pair) {
268  return;
269  }
270 
273  }
274 }
275 
276 /*!
277  * \brief Kill a zombie thread
278  *
279  * This runs from the threadpool's control taskprocessor thread.
280  *
281  * \param data A thread_worker_pair containing the threadpool and the zombie thread
282  * \return 0
283  */
284 static int queued_zombie_thread_dead(void *data)
285 {
286  struct thread_worker_pair *pair = data;
287 
288  ao2_unlink(pair->pool->zombie_threads, pair->worker);
290 
292  return 0;
293 }
294 
295 /*!
296  * \brief Queue a task to kill a zombie thread
297  *
298  * This is called by a worker thread when it acknowledges that it is time for
299  * it to die.
300  */
302  struct worker_thread *worker)
303 {
304  struct thread_worker_pair *pair;
305  SCOPED_AO2LOCK(lock, pool);
306 
307  if (pool->shutting_down) {
308  return;
309  }
310 
311  pair = thread_worker_pair_alloc(pool, worker);
312  if (!pair) {
313  return;
314  }
315 
318  }
319 }
320 
321 static int queued_idle_thread_dead(void *data)
322 {
323  struct thread_worker_pair *pair = data;
324 
325  ao2_unlink(pair->pool->idle_threads, pair->worker);
327 
329  return 0;
330 }
331 
333  struct worker_thread *worker)
334 {
335  struct thread_worker_pair *pair;
336  SCOPED_AO2LOCK(lock, pool);
337 
338  if (pool->shutting_down) {
339  return;
340  }
341 
342  pair = thread_worker_pair_alloc(pool, worker);
343  if (!pair) {
344  return;
345  }
346 
349  }
350 }
351 
352 /*!
353  * \brief Execute a task in the threadpool
354  *
355  * This is the function that worker threads call in order to execute tasks
356  * in the threadpool
357  *
358  * \param pool The pool to which the tasks belong.
359  * \retval 0 Either the pool has been shut down or there are no tasks.
360  * \retval 1 There are still tasks remaining in the pool.
361  */
363 {
364  ao2_lock(pool);
365  if (!pool->shutting_down) {
366  ao2_unlock(pool);
367  return ast_taskprocessor_execute(pool->tps);
368  }
369  ao2_unlock(pool);
370  return 0;
371 }
372 
373 /*!
374  * \brief Destroy a threadpool's components.
375  *
376  * This is the destructor called automatically when the threadpool's
377  * reference count reaches zero. This is not to be confused with
378  * threadpool_destroy.
379  *
380  * By the time this actually gets called, most of the cleanup has already
381  * been done in the pool. The only thing left to do is to release the
382  * final reference to the threadpool listener.
383  *
384  * \param obj The pool to destroy
385  */
386 static void threadpool_destructor(void *obj)
387 {
388  struct ast_threadpool *pool = obj;
389  ao2_cleanup(pool->listener);
390 }
391 
392 /*
393  * \brief Allocate a threadpool
394  *
395  * This is implemented as a taskprocessor listener's alloc callback. This
396  * is because the threadpool exists as the private data on a taskprocessor
397  * listener.
398  *
399  * \param name The name of the threadpool.
400  * \param options The options the threadpool uses.
401  * \retval NULL Could not initialize threadpool properly
402  * \retval non-NULL The newly-allocated threadpool
403  */
404 static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
405 {
407  struct ast_str *control_tps_name;
408 
410  control_tps_name = ast_str_create(64);
411  if (!pool || !control_tps_name) {
412  ast_free(control_tps_name);
413  return NULL;
414  }
415 
416  ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
417 
418  pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
419  ast_free(control_tps_name);
420  if (!pool->control_tps) {
421  return NULL;
422  }
425  if (!pool->active_threads) {
426  return NULL;
427  }
430  if (!pool->idle_threads) {
431  return NULL;
432  }
435  if (!pool->zombie_threads) {
436  return NULL;
437  }
438  pool->options = *options;
439 
440  ao2_ref(pool, +1);
441  return pool;
442 }
443 
445 {
446  return 0;
447 }
448 
449 /*!
450  * \brief helper used for queued task when tasks are pushed
451  */
453  /*! Pool into which a task was pushed */
455  /*! Indicator of whether the pool had no tasks prior to the new task being added */
457 };
458 
459 /*!
460  * \brief Allocate and initialize a task_pushed_data
461  * \param pool The threadpool to set in the task_pushed_data
462  * \param was_empty The was_empty value to set in the task_pushed_data
463  * \retval NULL Unable to allocate task_pushed_data
464  * \retval non-NULL The newly-allocated task_pushed_data
465  */
467  int was_empty)
468 {
469  struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
470 
471  if (!tpd) {
472  return NULL;
473  }
474  tpd->pool = pool;
475  tpd->was_empty = was_empty;
476  return tpd;
477 }
478 
479 /*!
480  * \brief Activate idle threads
481  *
482  * This function always returns CMP_MATCH because all workers that this
483  * function acts on need to be seen as matches so they are unlinked from the
484  * list of idle threads.
485  *
486  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
487  * \param obj The worker to activate
488  * \param arg The pool where the worker belongs
489  * \retval CMP_MATCH
490  */
491 static int activate_thread(void *obj, void *arg, int flags)
492 {
493  struct worker_thread *worker = obj;
494  struct ast_threadpool *pool = arg;
495 
496  if (!ao2_link(pool->active_threads, worker)) {
497  /* If we can't link the idle thread into the active container, then
498  * we'll just leave the thread idle and not wake it up.
499  */
500  ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
501  worker->id);
502  return 0;
503  }
504 
505  if (worker_set_state(worker, ALIVE)) {
506  ast_debug(1, "Failed to activate thread %d. It is dead\n",
507  worker->id);
508  /* The worker thread will no longer exist in the active threads or
509  * idle threads container after this.
510  */
511  ao2_unlink(pool->active_threads, worker);
512  }
513 
514  return CMP_MATCH;
515 }
516 
517 /*!
518  * \brief Add threads to the threadpool
519  *
520  * This function is called from the threadpool's control taskprocessor thread.
521  * \param pool The pool that is expanding
522  * \delta The number of threads to add to the pool
523  */
524 static void grow(struct ast_threadpool *pool, int delta)
525 {
526  int i;
527 
528  int current_size = ao2_container_count(pool->active_threads) +
530 
531  if (pool->options.max_size && current_size + delta > pool->options.max_size) {
532  delta = pool->options.max_size - current_size;
533  }
534 
535  ast_debug(3, "Increasing threadpool %s's size by %d\n",
536  ast_taskprocessor_name(pool->tps), delta);
537 
538  for (i = 0; i < delta; ++i) {
539  struct worker_thread *worker = worker_thread_alloc(pool);
540  if (!worker) {
541  return;
542  }
543  if (ao2_link(pool->idle_threads, worker)) {
544  if (worker_thread_start(worker)) {
545  ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
546  ao2_unlink(pool->active_threads, worker);
547  }
548  } else {
549  ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
550  }
551  ao2_ref(worker, -1);
552  }
553 }
554 
555 /*!
556  * \brief Queued task called when tasks are pushed into the threadpool
557  *
558  * This function first calls into the threadpool's listener to let it know
559  * that a task has been pushed. It then wakes up all idle threads and moves
560  * them into the active thread container.
561  * \param data A task_pushed_data
562  * \return 0
563  */
564 static int queued_task_pushed(void *data)
565 {
566  struct task_pushed_data *tpd = data;
567  struct ast_threadpool *pool = tpd->pool;
568  int was_empty = tpd->was_empty;
569  unsigned int existing_active;
570 
571  ast_free(tpd);
572 
573  if (pool->listener && pool->listener->callbacks->task_pushed) {
574  pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
575  }
576 
577  existing_active = ao2_container_count(pool->active_threads);
578 
579  /* The first pass transitions any existing idle threads to be active, and
580  * will also remove any worker threads that have recently entered the dead
581  * state.
582  */
584  activate_thread, pool);
585 
586  /* If no idle threads could be transitioned to active grow the pool as permitted. */
587  if (ao2_container_count(pool->active_threads) == existing_active) {
588  if (!pool->options.auto_increment) {
589  return 0;
590  }
591  grow(pool, pool->options.auto_increment);
592  /* An optional second pass transitions any newly added threads. */
594  activate_thread, pool);
595  }
596 
598  return 0;
599 }
600 
601 /*!
602  * \brief Taskprocessor listener callback called when a task is added
603  *
604  * The threadpool uses this opportunity to queue a task on its control taskprocessor
605  * in order to activate idle threads and notify the threadpool listener that the
606  * task has been pushed.
607  * \param listener The taskprocessor listener. The threadpool is the listener's private data
608  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
609  */
611  int was_empty)
612 {
614  struct task_pushed_data *tpd;
615  SCOPED_AO2LOCK(lock, pool);
616 
617  if (pool->shutting_down) {
618  return;
619  }
620 
621  tpd = task_pushed_data_alloc(pool, was_empty);
622  if (!tpd) {
623  return;
624  }
625 
627  ast_free(tpd);
628  }
629 }
630 
631 /*!
632  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
633  *
634  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
635  * \param data The pool that has become empty
636  * \return 0
637  */
638 static int queued_emptied(void *data)
639 {
640  struct ast_threadpool *pool = data;
641 
642  /* We already checked for existence of this callback when this was queued */
643  pool->listener->callbacks->emptied(pool, pool->listener);
644  return 0;
645 }
646 
647 /*!
648  * \brief Taskprocessor listener emptied callback
649  *
650  * The threadpool queues a task to let the threadpool listener know that
651  * the threadpool no longer contains any tasks.
652  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
653  */
655 {
657  SCOPED_AO2LOCK(lock, pool);
658 
659  if (pool->shutting_down) {
660  return;
661  }
662 
663  if (pool->listener && pool->listener->callbacks->emptied) {
665  /* Nothing to do here but we need the check to keep the compiler happy. */
666  }
667  }
668 }
669 
670 /*!
671  * \brief Taskprocessor listener shutdown callback
672  *
673  * The threadpool will shut down and destroy all of its worker threads when
674  * this is called back. By the time this gets called, the taskprocessor's
675  * control taskprocessor has already been destroyed. Therefore there is no risk
676  * in outright destroying the worker threads here.
677  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
678  */
680 {
682 
683  if (pool->listener && pool->listener->callbacks->shutdown) {
684  pool->listener->callbacks->shutdown(pool->listener);
685  }
687  ao2_cleanup(pool->idle_threads);
689  ao2_cleanup(pool);
690 }
691 
692 /*!
693  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
694  */
697  .task_pushed = threadpool_tps_task_pushed,
698  .emptied = threadpool_tps_emptied,
699  .shutdown = threadpool_tps_shutdown,
700 };
701 
702 /*!
703  * \brief ao2 callback to kill a set number of threads.
704  *
705  * Threads will be unlinked from the container as long as the
706  * counter has not reached zero. The counter is decremented with
707  * each thread that is removed.
708  * \param obj The worker thread up for possible destruction
709  * \param arg The counter
710  * \param flags Unused
711  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
712  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
713  */
714 static int kill_threads(void *obj, void *arg, int flags)
715 {
716  int *num_to_kill = arg;
717 
718  if (*num_to_kill > 0) {
719  --(*num_to_kill);
720  return CMP_MATCH;
721  } else {
722  return CMP_STOP;
723  }
724 }
725 
726 /*!
727  * \brief ao2 callback to zombify a set number of threads.
728  *
729  * Threads will be zombified as long as the counter has not reached
730  * zero. The counter is decremented with each thread that is zombified.
731  *
732  * Zombifying a thread involves removing it from its current container,
733  * adding it to the zombie container, and changing the state of the
734  * worker to a zombie
735  *
736  * This callback is called from the threadpool control taskprocessor thread.
737  *
738  * \param obj The worker thread that may be zombified
739  * \param arg The pool to which the worker belongs
740  * \param data The counter
741  * \param flags Unused
742  * \retval CMP_MATCH The zombified thread should be removed from its current container
743  * \retval CMP_STOP Stop attempting to zombify threads
744  */
745 static int zombify_threads(void *obj, void *arg, void *data, int flags)
746 {
747  struct worker_thread *worker = obj;
748  struct ast_threadpool *pool = arg;
749  int *num_to_zombify = data;
750 
751  if ((*num_to_zombify)-- > 0) {
752  if (!ao2_link(pool->zombie_threads, worker)) {
753  ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
754  return 0;
755  }
756  worker_set_state(worker, ZOMBIE);
757  return CMP_MATCH;
758  } else {
759  return CMP_STOP;
760  }
761 }
762 
763 /*!
764  * \brief Remove threads from the threadpool
765  *
766  * The preference is to kill idle threads. However, if there are
767  * more threads to remove than there are idle threads, then active
768  * threads will be zombified instead.
769  *
770  * This function is called from the threadpool control taskprocessor thread.
771  *
772  * \param pool The threadpool to remove threads from
773  * \param delta The number of threads to remove
774  */
775 static void shrink(struct ast_threadpool *pool, int delta)
776 {
777  /*
778  * Preference is to kill idle threads, but
779  * we'll move on to deactivating active threads
780  * if we have to
781  */
783  int idle_threads_to_kill = MIN(delta, idle_threads);
784  int active_threads_to_zombify = delta - idle_threads_to_kill;
785 
786  ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
787  ast_taskprocessor_name(pool->tps));
788 
790  kill_threads, &idle_threads_to_kill);
791 
792  ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
793  ast_taskprocessor_name(pool->tps));
794 
796  zombify_threads, pool, &active_threads_to_zombify);
797 }
798 
799 /*!
800  * \brief Helper struct used for queued operations that change the size of the threadpool
801  */
803  /*! The pool whose size is to change */
805  /*! The requested new size of the pool */
806  unsigned int size;
807 };
808 
809 /*!
810  * \brief Allocate and initialize a set_size_data
811  * \param pool The pool for the set_size_data
812  * \param size The size to store in the set_size_data
813  */
815  unsigned int size)
816 {
817  struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
818  if (!ssd) {
819  return NULL;
820  }
821 
822  ssd->pool = pool;
823  ssd->size = size;
824  return ssd;
825 }
826 
827 /*!
828  * \brief Change the size of the threadpool
829  *
830  * This can either result in shrinking or growing the threadpool depending
831  * on the new desired size and the current size.
832  *
833  * This function is run from the threadpool control taskprocessor thread
834  *
835  * \param data A set_size_data used for determining how to act
836  * \return 0
837  */
838 static int queued_set_size(void *data)
839 {
840  struct set_size_data *ssd = data;
841  struct ast_threadpool *pool = ssd->pool;
842  unsigned int num_threads = ssd->size;
843 
844  /* We don't count zombie threads as being "live" when potentially resizing */
845  unsigned int current_size = ao2_container_count(pool->active_threads) +
847 
848  ast_free(ssd);
849 
850  if (current_size == num_threads) {
851  ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
852  num_threads, current_size);
853  return 0;
854  }
855 
856  if (current_size < num_threads) {
858  activate_thread, pool);
859 
860  /* As the above may have altered the number of current threads update it */
861  current_size = ao2_container_count(pool->active_threads) +
863  grow(pool, num_threads - current_size);
865  activate_thread, pool);
866  } else {
867  shrink(pool, current_size - num_threads);
868  }
869 
871  return 0;
872 }
873 
874 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
875 {
876  struct set_size_data *ssd;
877  SCOPED_AO2LOCK(lock, pool);
878 
879  if (pool->shutting_down) {
880  return;
881  }
882 
883  ssd = set_size_data_alloc(pool, size);
884  if (!ssd) {
885  return;
886  }
887 
889  ast_free(ssd);
890  }
891 }
892 
895 {
896  struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
897  if (!listener) {
898  return NULL;
899  }
900  listener->callbacks = callbacks;
901  listener->user_data = user_data;
902  return listener;
903 }
904 
906 {
907  return listener->user_data;
908 }
909 
912  struct ast_threadpool_options options;
913 };
914 
917  const struct ast_threadpool_options *options)
918 {
919  struct ast_taskprocessor *tps;
920  RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
922  char *fullname;
923 
924  pool = threadpool_alloc(name, options);
925  if (!pool) {
926  return NULL;
927  }
928 
929  tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
930  if (!tps_listener) {
931  return NULL;
932  }
933 
934  if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
935  ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
936  return NULL;
937  }
938 
939  fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
940  sprintf(fullname, "%s/pool", name); /* Safe */
941  tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
942  if (!tps) {
943  return NULL;
944  }
945 
946  pool->tps = tps;
947  if (listener) {
948  ao2_ref(listener, +1);
949  pool->listener = listener;
950  }
951  ast_threadpool_set_size(pool, pool->options.initial_size);
952  ao2_ref(pool, +1);
953  return pool;
954 }
955 
956 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
957 {
958  SCOPED_AO2LOCK(lock, pool);
959  if (!pool->shutting_down) {
960  return ast_taskprocessor_push(pool->tps, task, data);
961  }
962  return -1;
963 }
964 
966 {
967  if (!pool) {
968  return;
969  }
970  /* Shut down the taskprocessors and everything else just
971  * takes care of itself via the taskprocessor callbacks
972  */
973  ao2_lock(pool);
974  pool->shutting_down = 1;
975  ao2_unlock(pool);
978 }
979 
980 /*!
981  * A monotonically increasing integer used for worker
982  * thread identification.
983  */
984 static int worker_id_counter;
985 
986 static int worker_thread_hash(const void *obj, int flags)
987 {
988  const struct worker_thread *worker = obj;
989 
990  return worker->id;
991 }
992 
993 static int worker_thread_cmp(void *obj, void *arg, int flags)
994 {
995  struct worker_thread *worker1 = obj;
996  struct worker_thread *worker2 = arg;
997 
998  return worker1->id == worker2->id ? CMP_MATCH : 0;
999 }
1000 
1001 /*!
1002  * \brief shut a worker thread down
1003  *
1004  * Set the worker dead and then wait for its thread
1005  * to finish executing.
1006  *
1007  * \param worker The worker thread to shut down
1008  */
1009 static void worker_shutdown(struct worker_thread *worker)
1010 {
1011  worker_set_state(worker, DEAD);
1012  if (worker->thread != AST_PTHREADT_NULL) {
1013  pthread_join(worker->thread, NULL);
1014  worker->thread = AST_PTHREADT_NULL;
1015  }
1016 }
1017 
1018 /*!
1019  * \brief Worker thread destructor
1020  *
1021  * Called automatically when refcount reaches 0. Shuts
1022  * down the worker thread and destroys its component
1023  * parts
1024  */
1025 static void worker_thread_destroy(void *obj)
1026 {
1027  struct worker_thread *worker = obj;
1028  ast_debug(3, "Destroying worker thread %d\n", worker->id);
1029  worker_shutdown(worker);
1030  ast_mutex_destroy(&worker->lock);
1031  ast_cond_destroy(&worker->cond);
1032 }
1033 
1034 /*!
1035  * \brief start point for worker threads
1036  *
1037  * Worker threads start in the active state but may
1038  * immediately go idle if there is no work to be
1039  * done
1040  *
1041  * \param arg The worker thread
1042  * \retval NULL
1043  */
1044 static void *worker_start(void *arg)
1045 {
1046  struct worker_thread *worker = arg;
1047  enum worker_state saved_state;
1048 
1049  if (worker->options.thread_start) {
1050  worker->options.thread_start();
1051  }
1052 
1053  ast_mutex_lock(&worker->lock);
1054  while (worker_idle(worker)) {
1055  ast_mutex_unlock(&worker->lock);
1056  worker_active(worker);
1057  ast_mutex_lock(&worker->lock);
1058  if (worker->state != ALIVE) {
1059  break;
1060  }
1061  threadpool_active_thread_idle(worker->pool, worker);
1062  }
1063  saved_state = worker->state;
1064  ast_mutex_unlock(&worker->lock);
1065 
1066  /* Reaching this portion means the thread is
1067  * on death's door. It may have been killed while
1068  * it was idle, in which case it can just die
1069  * peacefully. If it's a zombie, though, then
1070  * it needs to let the pool know so
1071  * that the thread can be removed from the
1072  * list of zombie threads.
1073  */
1074  if (saved_state == ZOMBIE) {
1075  threadpool_zombie_thread_dead(worker->pool, worker);
1076  }
1077 
1078  if (worker->options.thread_end) {
1079  worker->options.thread_end();
1080  }
1081  return NULL;
1082 }
1083 
1084 /*!
1085  * \brief Allocate and initialize a new worker thread
1086  *
1087  * This will create, initialize, and start the thread.
1088  *
1089  * \param pool The threadpool to which the worker will be added
1090  * \retval NULL Failed to allocate or start the worker thread
1091  * \retval non-NULL The newly-created worker thread
1092  */
1094 {
1095  struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
1096  if (!worker) {
1097  return NULL;
1098  }
1099  worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
1100  ast_mutex_init(&worker->lock);
1101  ast_cond_init(&worker->cond, NULL);
1102  worker->pool = pool;
1103  worker->thread = AST_PTHREADT_NULL;
1104  worker->state = ALIVE;
1105  worker->options = pool->options;
1106  return worker;
1107 }
1108 
1109 static int worker_thread_start(struct worker_thread *worker)
1110 {
1111  return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
1112 }
1113 
1114 /*!
1115  * \brief Active loop for worker threads
1116  *
1117  * The worker will stay in this loop for its lifetime,
1118  * executing tasks as they become available. If there
1119  * are no tasks currently available, then the thread
1120  * will go idle.
1121  *
1122  * \param worker The worker thread executing tasks.
1123  */
1124 static void worker_active(struct worker_thread *worker)
1125 {
1126  int alive;
1127 
1128  /* The following is equivalent to
1129  *
1130  * while (threadpool_execute(worker->pool));
1131  *
1132  * However, reviewers have suggested in the past
1133  * doing that can cause optimizers to (wrongly)
1134  * optimize the code away.
1135  */
1136  do {
1137  alive = threadpool_execute(worker->pool);
1138  } while (alive);
1139 }
1140 
1141 /*!
1142  * \brief Idle function for worker threads
1143  *
1144  * The worker waits here until it gets told by the threadpool
1145  * to wake up.
1146  *
1147  * worker is locked before entering this function.
1148  *
1149  * \param worker The idle worker
1150  * \retval 0 The thread is being woken up so that it can conclude.
1151  * \retval non-zero The thread is being woken up to do more work.
1152  */
1153 static int worker_idle(struct worker_thread *worker)
1154 {
1155  struct timeval start = ast_tvnow();
1156  struct timespec end = {
1157  .tv_sec = start.tv_sec + worker->options.idle_timeout,
1158  .tv_nsec = start.tv_usec * 1000,
1159  };
1160  while (!worker->wake_up) {
1161  if (worker->options.idle_timeout <= 0) {
1162  ast_cond_wait(&worker->cond, &worker->lock);
1163  } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
1164  break;
1165  }
1166  }
1167 
1168  if (!worker->wake_up) {
1169  ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
1170  threadpool_idle_thread_dead(worker->pool, worker);
1171  worker->state = DEAD;
1172  }
1173  worker->wake_up = 0;
1174  return worker->state == ALIVE;
1175 }
1176 
1177 /*!
1178  * \brief Change a worker's state
1179  *
1180  * The threadpool calls into this function in order to let a worker know
1181  * how it should proceed.
1182  *
1183  * \retval -1 failure (state transition not permitted)
1184  * \retval 0 success
1185  */
1186 static int worker_set_state(struct worker_thread *worker, enum worker_state state)
1187 {
1188  SCOPED_MUTEX(lock, &worker->lock);
1189 
1190  switch (state) {
1191  case ALIVE:
1192  /* This can occur due to a race condition between being told to go active
1193  * and an idle timeout happening.
1194  */
1195  if (worker->state == DEAD) {
1196  return -1;
1197  }
1198  ast_assert(worker->state != ZOMBIE);
1199  break;
1200  case DEAD:
1201  break;
1202  case ZOMBIE:
1203  ast_assert(worker->state != DEAD);
1204  break;
1205  }
1206 
1207  worker->state = state;
1208  worker->wake_up = 1;
1209  ast_cond_signal(&worker->cond);
1210 
1211  return 0;
1212 }
1213 
1214 /*! Serializer group shutdown control object. */
1216  /*! Shutdown thread waits on this conditional. */
1218  /*! Count of serializers needing to shutdown. */
1219  int count;
1220 };
1221 
1222 static void serializer_shutdown_group_dtor(void *vdoomed)
1223 {
1224  struct ast_serializer_shutdown_group *doomed = vdoomed;
1225 
1226  ast_cond_destroy(&doomed->cond);
1227 }
1228 
1230 {
1232 
1233  shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
1234  if (!shutdown_group) {
1235  return NULL;
1236  }
1237  ast_cond_init(&shutdown_group->cond, NULL);
1238  return shutdown_group;
1239 }
1240 
1242 {
1243  int remaining;
1244  ast_mutex_t *lock;
1245 
1246  if (!shutdown_group) {
1247  return 0;
1248  }
1249 
1250  lock = ao2_object_get_lockaddr(shutdown_group);
1251  ast_assert(lock != NULL);
1252 
1253  ao2_lock(shutdown_group);
1254  if (timeout) {
1255  struct timeval start;
1256  struct timespec end;
1257 
1258  start = ast_tvnow();
1259  end.tv_sec = start.tv_sec + timeout;
1260  end.tv_nsec = start.tv_usec * 1000;
1261  while (shutdown_group->count) {
1262  if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
1263  /* Error or timed out waiting for the count to reach zero. */
1264  break;
1265  }
1266  }
1267  } else {
1268  while (shutdown_group->count) {
1269  if (ast_cond_wait(&shutdown_group->cond, lock)) {
1270  /* Error */
1271  break;
1272  }
1273  }
1274  }
1275  remaining = shutdown_group->count;
1276  ao2_unlock(shutdown_group);
1277  return remaining;
1278 }
1279 
1280 /*!
1281  * \internal
1282  * \brief Increment the number of serializer members in the group.
1283  * \since 13.5.0
1284  *
1285  * \param shutdown_group Group shutdown controller.
1286  *
1287  * \return Nothing
1288  */
1290 {
1291  ao2_lock(shutdown_group);
1292  ++shutdown_group->count;
1293  ao2_unlock(shutdown_group);
1294 }
1295 
1296 /*!
1297  * \internal
1298  * \brief Decrement the number of serializer members in the group.
1299  * \since 13.5.0
1300  *
1301  * \param shutdown_group Group shutdown controller.
1302  *
1303  * \return Nothing
1304  */
1306 {
1307  ao2_lock(shutdown_group);
1308  --shutdown_group->count;
1309  if (!shutdown_group->count) {
1310  ast_cond_signal(&shutdown_group->cond);
1311  }
1312  ao2_unlock(shutdown_group);
1313 }
1314 
1315 struct serializer {
1316  /*! Threadpool the serializer will use to process the jobs. */
1318  /*! Which group will wait for this serializer to shutdown. */
1320 };
1321 
1322 static void serializer_dtor(void *obj)
1323 {
1324  struct serializer *ser = obj;
1325 
1326  ao2_cleanup(ser->pool);
1327  ser->pool = NULL;
1329  ser->shutdown_group = NULL;
1330 }
1331 
1332 static struct serializer *serializer_create(struct ast_threadpool *pool,
1334 {
1335  struct serializer *ser;
1336 
1338  if (!ser) {
1339  return NULL;
1340  }
1341  ao2_ref(pool, +1);
1342  ser->pool = pool;
1343  ser->shutdown_group = ao2_bump(shutdown_group);
1344  return ser;
1345 }
1346 
1347 AST_THREADSTORAGE_RAW(current_serializer);
1348 
1349 static int execute_tasks(void *data)
1350 {
1351  struct ast_taskprocessor *tps = data;
1352 
1353  ast_threadstorage_set_ptr(&current_serializer, tps);
1354  while (ast_taskprocessor_execute(tps)) {
1355  /* No-op */
1356  }
1357  ast_threadstorage_set_ptr(&current_serializer, NULL);
1358 
1360  return 0;
1361 }
1362 
1364 {
1365  if (was_empty) {
1366  struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1368 
1369  if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
1371  }
1372  }
1373 }
1374 
1376 {
1377  /* No-op */
1378  return 0;
1379 }
1380 
1382 {
1383  struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
1384 
1385  if (ser->shutdown_group) {
1387  }
1388  ao2_cleanup(ser);
1389 }
1390 
1393  .start = serializer_start,
1394  .shutdown = serializer_shutdown,
1395 };
1396 
1398 {
1399  return ast_threadstorage_get_ptr(&current_serializer);
1400 }
1401 
1404 {
1405  struct serializer *ser;
1407  struct ast_taskprocessor *tps;
1408 
1409  ser = serializer_create(pool, shutdown_group);
1410  if (!ser) {
1411  return NULL;
1412  }
1413 
1414  listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
1415  if (!listener) {
1416  ao2_ref(ser, -1);
1417  return NULL;
1418  }
1419 
1420  tps = ast_taskprocessor_create_with_listener(name, listener);
1421  if (!tps) {
1422  /* ser ref transferred to listener but not cleaned without tps */
1423  ao2_ref(ser, -1);
1424  } else if (shutdown_group) {
1425  serializer_shutdown_group_inc(shutdown_group);
1426  }
1427 
1428  ao2_ref(listener, -1);
1429  return tps;
1430 }
1431 
1433 {
1434  return ast_threadpool_serializer_group(name, pool, NULL);
1435 }
1436 
1438 {
1439  return ast_taskprocessor_size(pool->tps);
1440 }
A listener for taskprocessors.
enum sip_cc_notify_state state
Definition: chan_sip.c:959
struct ast_taskprocessor * ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1432
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Set the number of threads for the thread pool.
Definition: threadpool.c:874
struct ast_serializer_shutdown_group * shutdown_group
Definition: threadpool.c:1319
worker_state
states for worker threads
Definition: threadpool.c:120
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
Taskprocessor listener shutdown callback.
Definition: threadpool.c:679
struct ast_threadpool_options options
Definition: threadpool.c:159
static void worker_thread_destroy(void *obj)
Worker thread destructor.
Definition: threadpool.c:1025
Asterisk main include file. File version handling, generic pbx functions.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
Definition: threadpool.h:36
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to move a thread from the active list to the idle list.
Definition: threadpool.c:256
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1402
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener&#39;s taskprocessor.
static void grow(struct ast_threadpool *pool, int delta)
Add threads to the threadpool.
Definition: threadpool.c:524
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
static struct set_size_data * set_size_data_alloc(struct ast_threadpool *pool, unsigned int size)
Allocate and initialize a set_size_data.
Definition: threadpool.c:814
static void worker_active(struct worker_thread *worker)
Active loop for worker threads.
Definition: threadpool.c:1124
Struct used for queued operations involving worker state changes.
Definition: threadpool.c:193
static void shrink(struct ast_threadpool *pool, int delta)
Remove threads from the threadpool.
Definition: threadpool.c:775
#define AST_THREADPOOL_OPTIONS_VERSION
Definition: threadpool.h:71
static struct task_pushed_data * task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty)
Allocate and initialize a task_pushed_data.
Definition: threadpool.c:466
#define LOG_WARNING
Definition: logger.h:274
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:714
#define ao2_callback(c, flags, cb_fn, arg)
Definition: astobj2.h:1716
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
struct ast_taskprocessor * ast_threadpool_serializer_get_current(void)
Get the threadpool serializer currently associated with this thread.
Definition: threadpool.c:1397
static int timeout
Definition: cdr_mysql.c:86
struct ast_threadpool_options options
Definition: threadpool.c:100
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currenly waiting to run tasks...
Definition: threadpool.c:48
static pj_pool_t * pool
Global memory pool for configuration and timers.
ast_mutex_t lock
Definition: threadpool.c:149
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:75
Assume that the ao2_container is already locked.
Definition: astobj2.h:1067
static struct ast_threadpool * threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
Definition: threadpool.c:404
#define ast_cond_wait(cond, mutex)
Definition: lock.h:203
#define ast_cond_init(cond, attr)
Definition: lock.h:199
static void threadpool_idle_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Definition: threadpool.c:332
static int zombify_threads(void *obj, void *arg, void *data, int flags)
ao2 callback to zombify a set number of threads.
Definition: threadpool.c:745
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:150
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
static void threadpool_destructor(void *obj)
Destroy a threadpool&#39;s components.
Definition: threadpool.c:386
#define ao2_alloc_options(data_size, destructor_fn, options)
Definition: astobj2.h:406
#define ast_assert(a)
Definition: utils.h:695
#define ast_mutex_lock(a)
Definition: lock.h:187
static struct worker_thread * worker_thread_alloc(struct ast_threadpool *pool)
Allocate and initialize a new worker thread.
Definition: threadpool.c:1093
#define ao2_unlock(a)
Definition: astobj2.h:730
struct ast_threadpool * ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, const struct ast_threadpool_options *options)
Create a new threadpool.
Definition: threadpool.c:915
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
#define NULL
Definition: resample.c:96
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
Definition: threadpool.h:67
#define ast_cond_signal(cond)
Definition: lock.h:201
const struct ast_threadpool_listener_callbacks * callbacks
Definition: threadpool.c:112
Helper struct used for queued operations that change the size of the threadpool.
Definition: threadpool.c:802
static void threadpool_send_state_changed(struct ast_threadpool *pool)
Notify the threadpool listener that the state has changed.
Definition: threadpool.c:180
long ast_threadpool_queue_size(struct ast_threadpool *pool)
Return the size of the threadpool&#39;s task queue.
Definition: threadpool.c:1437
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:755
enum worker_state state
Definition: threadpool.c:155
static int task(void *data)
Queued task for baseline test.
Utility functions.
pthread_cond_t ast_cond_t
Definition: lock.h:176
AST_THREADSTORAGE_RAW(current_serializer)
static int activate_thread(void *obj, void *arg, int flags)
Activate idle threads.
Definition: threadpool.c:491
static void * worker_start(void *arg)
start point for worker threads
Definition: threadpool.c:1044
#define ao2_bump(obj)
Definition: astobj2.h:491
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool&#39;s taskprocessor has become empty.
Definition: threadpool.h:56
#define MIN(a, b)
Definition: utils.h:226
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition: strings.h:1065
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ast_log
Definition: astobj2.c:42
struct ast_threadpool * pool
Definition: threadpool.c:911
static void serializer_dtor(void *obj)
Definition: threadpool.c:1322
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:587
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks
Table of taskprocessor listener callbacks for threadpool&#39;s main taskprocessor.
Definition: threadpool.c:695
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
static int worker_thread_hash(const void *obj, int flags)
Definition: threadpool.c:986
struct ast_threadpool * pool
Definition: threadpool.c:804
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:911
#define AST_PTHREADT_NULL
Definition: lock.h:66
static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks
Definition: threadpool.c:1391
ast_mutex_t lock
Definition: app_meetme.c:1091
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct ast_threadpool * pool
Definition: threadpool.c:1317
#define ao2_lock(a)
Definition: astobj2.h:718
static int serializer_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1375
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:444
void(* thread_start)(void)
Function to call when a thread starts.
Definition: threadpool.h:117
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
struct worker_thread * worker
Definition: threadpool.c:197
int ast_threadpool_push(struct ast_threadpool *pool, int(*task)(void *data), void *data)
Push a task to the threadpool.
Definition: threadpool.c:956
void(* thread_end)(void)
Function to call when a thread ends.
Definition: threadpool.h:124
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
Definition: threadpool.c:1381
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
Definition: threadpool.c:1241
static int kill_threads(void *obj, void *arg, int flags)
ao2 callback to kill a set number of threads.
Definition: threadpool.c:714
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1289
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:290
static int queued_idle_thread_dead(void *data)
Definition: threadpool.c:321
struct ast_threadpool_listener * listener
Definition: threadpool.c:38
struct ast_threadpool * pool
Definition: threadpool.c:153
struct ast_threadpool * pool
Definition: threadpool.c:454
#define THREAD_BUCKETS
Definition: threadpool.c:28
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker)
Queue a task to kill a zombie thread.
Definition: threadpool.c:301
#define LOG_ERROR
Definition: logger.h:285
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Definition: astobj2.h:1310
The descriptor of a dynamic string XXX storage will be optimized later if needed We use the ts field ...
Definition: strings.h:584
#define ao2_unlink(container, obj)
Definition: astobj2.h:1598
#define ao2_callback_data(container, flags, cb_fn, arg, data)
Definition: astobj2.h:1743
struct ast_taskprocessor * tps
The main taskprocessor.
Definition: threadpool.c:67
#define ast_cond_destroy(cond)
Definition: lock.h:200
#define ao2_alloc(data_size, destructor_fn)
Definition: astobj2.h:411
void * ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
Get the threadpool listener&#39;s user data.
Definition: threadpool.c:905
static int queued_emptied(void *data)
Queued task that handles the case where the threadpool&#39;s taskprocessor is emptied.
Definition: threadpool.c:638
static int worker_thread_start(struct worker_thread *worker)
Definition: threadpool.c:1109
static struct thread_worker_pair * thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker)
Allocate and initialize a thread_worker_pair.
Definition: threadpool.c:214
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:602
static const char name[]
Definition: cdr_mysql.c:74
#define ast_free(a)
Definition: astmm.h:182
#define ast_pthread_create(a, b, c, d)
Definition: utils.h:559
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
Definition: threadpool.c:53
pthread_t thread
Definition: threadpool.c:151
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
struct ast_threadpool_listener * ast_threadpool_listener_alloc(const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
Allocate a threadpool listener.
Definition: threadpool.c:893
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
An API for managing task processing threads that can be shared across modules.
static int worker_id_counter
Definition: threadpool.c:984
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
static int worker_idle(struct worker_thread *worker)
Idle function for worker threads.
Definition: threadpool.c:1153
static void serializer_shutdown_group_dtor(void *vdoomed)
Definition: threadpool.c:1222
static void worker_shutdown(struct worker_thread *worker)
shut a worker thread down
Definition: threadpool.c:1009
static int worker_set_state(struct worker_thread *worker, enum worker_state state)
Change a worker&#39;s state.
Definition: threadpool.c:1186
listener for a threadpool
Definition: threadpool.c:110
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
Definition: threadpool.c:1229
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
Definition: threadpool.c:43
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
Taskprocessor listener emptied callback.
Definition: threadpool.c:654
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
Definition: threadpool.h:47
#define ao2_cleanup(obj)
Definition: astobj2.h:1958
static int worker_thread_cmp(void *obj, void *arg, int flags)
Definition: threadpool.c:993
An opaque threadpool structure.
Definition: threadpool.c:36
static int queued_zombie_thread_dead(void *data)
Kill a zombie thread.
Definition: threadpool.c:284
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
static int execute_tasks(void *data)
Definition: threadpool.c:1349
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Destructor for thread_worker_pair.
Definition: threadpool.c:203
static int queued_task_pushed(void *data)
Queued task called when tasks are pushed into the threadpool.
Definition: threadpool.c:564
static int threadpool_execute(struct ast_threadpool *pool)
Execute a task in the threadpool.
Definition: threadpool.c:362
static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Definition: threadpool.c:1363
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
static struct serializer * serializer_create(struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1332
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
Definition: taskprocessor.h:98
helper used for queued task when tasks are pushed
Definition: threadpool.c:452
static int queued_active_thread_idle(void *data)
Move a worker thread from the active container to the idle container.
Definition: threadpool.c:235
#define ast_mutex_init(pmutex)
Definition: lock.h:184
Generic container type.
struct ast_threadpool * pool
Definition: threadpool.c:195
#define ast_mutex_destroy(a)
Definition: lock.h:186
static int queued_set_size(void *data)
Change the size of the threadpool.
Definition: threadpool.c:838
unsigned int size
Definition: threadpool.c:806
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
Definition: threadpool.c:1305
static struct ast_serializer_shutdown_group * shutdown_group
Shutdown group for options serializers.
#define ast_cond_timedwait(cond, mutex, time)
Definition: lock.h:204
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
Taskprocessor listener callback called when a task is added.
Definition: threadpool.c:610
ast_cond_t cond
Definition: threadpool.c:147
void ast_threadpool_shutdown(struct ast_threadpool *pool)
Shut down a threadpool and destroy it.
Definition: threadpool.c:965
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Definition: taskprocessor.h:91
Structure for mutex and tracking information.
Definition: lock.h:135
#define ast_str_create(init_len)
Create a malloc&#39;ed dynamic length string.
Definition: strings.h:620
#define ast_mutex_unlock(a)
Definition: lock.h:188
struct ast_taskprocessor * control_tps
The control taskprocessor.
Definition: threadpool.c:96
#define ao2_link(container, obj)
Definition: astobj2.h:1549