Asterisk - The Open Source Telephony Project  18.5.0
Functions
serializer.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

struct ast_serializer_poolast_serializer_pool_create (const char *name, unsigned int size, struct ast_threadpool *threadpool, int timeout)
 Create a serializer pool. More...
 
int ast_serializer_pool_destroy (struct ast_serializer_pool *pool)
 Destroy the serializer pool. More...
 
struct ast_taskprocessorast_serializer_pool_get (struct ast_serializer_pool *pool)
 Retrieve a serializer from the pool. More...
 
const char * ast_serializer_pool_name (const struct ast_serializer_pool *pool)
 Retrieve the base name of the serializer pool. More...
 
int ast_serializer_pool_set_alerts (struct ast_serializer_pool *pool, long high, long low)
 Set taskprocessor alert levels for the serializers in the pool. More...
 

Function Documentation

◆ ast_serializer_pool_create()

struct ast_serializer_pool* ast_serializer_pool_create ( const char *  name,
unsigned int  size,
struct ast_threadpool threadpool,
int  timeout 
)

Create a serializer pool.

Create a serializer pool with an optional shutdown group. If a timeout greater than -1 is specified then a shutdown group is enabled on the pool.

Parameters
nameThe base name for the pool, and used when building taskprocessor(s)
sizeThe size of the pool
threadpoolThe backing threadpool to use
timeoutThe timeout used if using a shutdown group (-1 = disabled)
Return values
Anewly allocated serializer pool object, or NULL on error

Definition at line 76 of file serializer.c.

References ast_assert, ast_log, ast_malloc, ast_serializer_pool_destroy(), ast_serializer_shutdown_group_alloc(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_name_append(), ast_threadpool_serializer_group(), AST_VECTOR_APPEND, AST_VECTOR_RW_INIT, LOG_ERROR, NULL, pool, ast_serializer_pool::shutdown_group, ast_serializer_pool::shutdown_group_timeout, and timeout.

Referenced by AST_TEST_DEFINE(), and load_module().

78 {
79  struct ast_serializer_pool *pool;
80  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
81  size_t idx;
82 
83  ast_assert(size > 0);
84 
85  pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
86  if (!pool) {
87  return NULL;
88  }
89 
90  strcpy(pool->name, name); /* safe */
91 
94 
95  AST_VECTOR_RW_INIT(&pool->serializers, size);
96 
97  for (idx = 0; idx < size; ++idx) {
98  struct ast_taskprocessor *tps;
99 
100  /* Create name with seq number appended. */
101  ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
102 
103  tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);
104  if (!tps) {
106  ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
107  tps_name);
108  return NULL;
109  }
110 
111  if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
113  ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
114  tps_name);
115  return NULL;
116  }
117  }
118 
119  return pool;
120 }
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
Definition: vector.h:158
struct ast_serializer_shutdown_group * ast_serializer_shutdown_group_alloc(void)
Create a serializer group shutdown control object.
Definition: threadpool.c:1229
static int timeout
Definition: cdr_mysql.c:86
static pj_pool_t * pool
Global memory pool for configuration and timers.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
struct ast_taskprocessor * ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
Serialized execution of tasks within a ast_threadpool.
Definition: threadpool.c:1402
#define ast_assert(a)
Definition: utils.h:695
#define NULL
Definition: resample.c:96
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
Destroy the serializer pool.
Definition: serializer.c:39
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:60
#define ast_log
Definition: astobj2.c:42
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:193
#define LOG_ERROR
Definition: logger.h:285
struct ast_serializer_shutdown_group * shutdown_group
Definition: serializer.c:30
static const char name[]
Definition: cdr_mysql.c:74
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
Append the next sequence number to the given string, and copy into the buffer.

◆ ast_serializer_pool_destroy()

int ast_serializer_pool_destroy ( struct ast_serializer_pool pool)

Destroy the serializer pool.

Attempt to destroy the serializer pool. If a shutdown group has been enabled, and times out waiting for threads to complete, then this function will return the number of remaining threads, and the pool will not be destroyed.

Parameters
poolThe pool to destroy

Definition at line 39 of file serializer.c.

References ao2_ref, ast_debug, ast_free, ast_log, ast_serializer_shutdown_group_join(), ast_taskprocessor_unreference(), AST_VECTOR_RESET, AST_VECTOR_RW_FREE, AST_VECTOR_RW_UNLOCK, AST_VECTOR_RW_WRLOCK, LOG_WARNING, NULL, ast_serializer_pool::shutdown_group, and ast_serializer_pool::shutdown_group_timeout.

Referenced by ast_serializer_pool_create(), AST_TEST_DEFINE(), load_module(), and unload_module().

40 {
41  if (!pool) {
42  return 0;
43  }
44 
45  /* Clear out the serializers */
46  AST_VECTOR_RW_WRLOCK(&pool->serializers);
48  AST_VECTOR_RW_UNLOCK(&pool->serializers);
49 
50  /* If using a shutdown group then wait for all threads to complete */
51  if (pool->shutdown_group) {
52  int remaining;
53 
54  ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
55 
58 
59  if (remaining) {
60  /* If we've timed out don't fully cleanup yet */
61  ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
62  "'%d' dependencies still processing.\n", pool->name, remaining);
63  return remaining;
64  }
65 
66  ao2_ref(pool->shutdown_group, -1);
67  pool->shutdown_group = NULL;
68  }
69 
70  AST_VECTOR_RW_FREE(&pool->serializers);
71  ast_free(pool);
72 
73  return 0;
74 }
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
#define LOG_WARNING
Definition: logger.h:274
#define NULL
Definition: resample.c:96
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
Wait for the serializers in the group to shutdown with timeout.
Definition: threadpool.c:1241
#define ast_debug(level,...)
Log a DEBUG message.
Definition: logger.h:452
#define ast_log
Definition: astobj2.c:42
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
Definition: vector.h:202
#define ao2_ref(o, delta)
Definition: astobj2.h:464
struct ast_serializer_shutdown_group * shutdown_group
Definition: serializer.c:30
#define ast_free(a)
Definition: astmm.h:182
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
Definition: vector.h:890
#define AST_VECTOR_RESET(vec, cleanup)
Reset vector.
Definition: vector.h:627
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.

◆ ast_serializer_pool_get()

struct ast_taskprocessor* ast_serializer_pool_get ( struct ast_serializer_pool pool)

Retrieve a serializer from the pool.

Parameters
poolThe pool object
Return values
Aserializer/taskprocessor

Definition at line 127 of file serializer.c.

References ast_taskprocessor_size(), AST_VECTOR_GET, AST_VECTOR_RW_RDLOCK, AST_VECTOR_RW_UNLOCK, AST_VECTOR_SIZE, and NULL.

Referenced by ast_sip_push_task(), ast_sip_push_task_wait_serializer(), AST_TEST_DEFINE(), load_module(), mwi_startup_event_cb(), mwi_stasis_cb(), send_contact_notify(), and send_notify().

128 {
129  struct ast_taskprocessor *res;
130  size_t idx;
131 
132  if (!pool) {
133  return NULL;
134  }
135 
136  AST_VECTOR_RW_RDLOCK(&pool->serializers);
137  if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
138  AST_VECTOR_RW_UNLOCK(&pool->serializers);
139  return NULL;
140  }
141 
142  res = AST_VECTOR_GET(&pool->serializers, 0);
143 
144  /* Choose the taskprocessor with the smallest queue */
145  for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
146  struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
148  res = cur;
149  }
150  }
151 
152  AST_VECTOR_RW_UNLOCK(&pool->serializers);
153  return res;
154 }
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
Definition: vector.h:900
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
Definition: vector.h:880
#define NULL
Definition: resample.c:96
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611

◆ ast_serializer_pool_name()

const char* ast_serializer_pool_name ( const struct ast_serializer_pool pool)

Retrieve the base name of the serializer pool.

Parameters
poolThe pool object
Return values
Thebase name given to the pool

Definition at line 122 of file serializer.c.

Referenced by AST_TEST_DEFINE().

123 {
124  return pool->name;
125 }

◆ ast_serializer_pool_set_alerts()

int ast_serializer_pool_set_alerts ( struct ast_serializer_pool pool,
long  high,
long  low 
)

Set taskprocessor alert levels for the serializers in the pool.

Parameters
poolThe pool to destroy
Return values
0on success, or -1 on error.

Definition at line 156 of file serializer.c.

References ast_log, AST_LOG_WARNING, ast_taskprocessor_alert_set_levels(), AST_TASKPROCESSOR_HIGH_WATER_LEVEL, ast_taskprocessor_name(), AST_VECTOR_GET, AST_VECTOR_SIZE, ast_taskprocessor::tps_queue_high, and ast_taskprocessor::tps_queue_low.

Referenced by AST_TEST_DEFINE(), and global_loaded().

157 {
158  size_t idx;
159  long tps_queue_high;
160  long tps_queue_low;
161 
162  if (!pool) {
163  return 0;
164  }
165 
166  tps_queue_high = high;
167  if (tps_queue_high <= 0) {
168  ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
169  "trigger level '%ld'\n", pool->name, tps_queue_high);
170  tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
171  }
172 
173  tps_queue_low = low;
174  if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
175  ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
176  "level '%ld'\n", pool->name, tps_queue_low);
177  tps_queue_low = -1;
178  }
179 
180  for (idx = 0; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
181  struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
182  if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {
183  ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
185  }
186  }
187 
188  return 0;
189 }
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
Definition: taskprocessor.h:63
#define AST_LOG_WARNING
Definition: logger.h:279
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
long tps_queue_high
Taskprocessor high water alert trigger level.
Definition: taskprocessor.c:78
#define ast_log
Definition: astobj2.c:42
long tps_queue_low
Taskprocessor low water clear alert level.
Definition: taskprocessor.c:76
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:682
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:611