35 #include <corosync/cpg.h> 36 #include <corosync/cfg.h> 55 #define COROSYNC_POLL_TIMEOUT (10 * 1000) 89 #define COROSYNC_IPC_BUFFER_SIZE (8192 * 128) 92 #define corosync_pthread_create_background(a, b, c, d) \ 93 ast_pthread_create_stack(a, b, c, d, \ 94 (AST_BACKGROUND_STACKSIZE + (3 * COROSYNC_IPC_BUFFER_SIZE)), \ 95 __FILE__, __FUNCTION__, __LINE__, #c) 145 cmp = (left->
id == *
id);
148 cmp = (left->
id == right->
id);
187 if (!payload->
event) {
213 if (!corosync_ping_message_type()) {
228 ao2_t_ref(payload, -1,
"Destroy payload on off nominal");
234 ao2_t_ref(payload, -1,
"Hand ref to stasis");
235 ao2_t_ref(message, -1,
"Hand ref to stasis");
261 .publish_default = 1,
262 .subscribe_default = 1,
264 .message_type_fn = corosync_ping_message_type,
267 .publish_default = 1,
268 .subscribe_default = 1,
280 .alert_pipe = { -1, -1 },
286 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK 287 static void cfg_state_track_cb(
288 corosync_cfg_state_notification_buffer_t *notification_buffer,
293 corosync_cfg_shutdown_flags_t flags);
296 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK 297 .corosync_cfg_state_track_callback = cfg_state_track_cb,
318 joined ?
"joined" :
"left");
398 unsigned int new_msgs;
399 unsigned int old_msgs;
414 if (new_msgs > INT_MAX) {
418 if (old_msgs > INT_MAX) {
423 (
int)old_msgs,
NULL, event_eid)) {
427 mailbox, context, eid);
436 unsigned int cachable;
458 static void cpg_deliver_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
459 uint32_t nodeid, uint32_t pid,
void *msg,
size_t msg_len);
461 static void cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
462 const struct cpg_address *member_list,
size_t member_list_entries,
463 const struct cpg_address *left_list,
size_t left_list_entries,
464 const struct cpg_address *joined_list,
size_t joined_list_entries);
471 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK 472 static void cfg_state_track_cb(
473 corosync_cfg_state_notification_buffer_t *notification_buffer,
480 corosync_cfg_shutdown_flags_t flags)
484 static void cpg_deliver_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
485 uint32_t nodeid, uint32_t pid,
void *msg,
size_t msg_len)
493 ast_debug(1,
"Ignoring event that's too small. %u < %u\n",
494 (
unsigned int) msg_len,
513 publish_handler =
event_types[event_type].publish_to_stasis;
528 memcpy(event, msg, msg_len);
538 ast_debug(5,
"Publishing event %s (%u) to stasis\n",
540 publish_handler(event);
549 iov.iov_base = (
void *)event;
552 ast_debug(5,
"Publishing event %s (%u) to corosync\n",
558 ast_debug(5,
"publish_event_to_corosync rdlock\n");
559 if ((cs_err = cpg_mcast_joined(
cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
564 ast_debug(5,
"publish_event_to_corosync unlock\n");
594 ast_log(
LOG_NOTICE,
"Sending event PING from this server with EID: '%s'\n", buf);
647 static void cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
648 const struct cpg_address *member_list,
size_t member_list_entries,
649 const struct cpg_address *left_list,
size_t left_list_entries,
650 const struct cpg_address *joined_list,
size_t joined_list_entries)
655 for (i = 0; i < left_list_entries; i++) {
656 const struct cpg_address *cpg_node = &left_list[i];
692 ao2_t_ref(messages, -1,
"Dispose of flushed cache");
701 if (!joined_list_entries) {
732 ao2_t_ref(messages, -1,
"Dispose of dumped cache");
740 unsigned int node_id;
742 corosync_cfg_node_address_t corosync_addr;
750 ast_debug(5,
"send_cluster_notify rdlock\n");
752 if ((cs_err = corosync_cfg_local_get(
cfg_handle, &node_id)) != CS_OK) {
753 ast_log(
LOG_WARNING,
"Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
757 if (((cs_err = corosync_cfg_get_node_addrs(
cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
758 ast_log(
LOG_WARNING,
"Failed to get local Corosync address. Not informing cluster of existance.\n");
763 ast_debug(5,
"send_cluster_notify unlock\n");
766 sa = (
struct sockaddr *)corosync_addr.address;
767 sa_len = (
size_t)corosync_addr.address_length;
768 if ((res = getnameinfo(sa, sa_len, buf,
sizeof(buf),
NULL, 0, NI_NUMERICHOST))) {
769 ast_log(
LOG_WARNING,
"Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
770 gai_strerror(res), res);
785 struct pollfd pfd[3] = {
786 { .events = POLLIN, },
787 { .events = POLLIN, },
788 { .events = POLLIN, },
792 ast_debug(5,
"dispatch_thread_handler rdlock\n");
793 if ((cs_err = cpg_fd_get(
cpg_handle, &pfd[0].fd)) != CS_OK) {
796 ast_debug(5,
"dispatch_thread_handler unlock\n");
800 if ((cs_err = corosync_cfg_fd_get(
cfg_handle, &pfd[1].fd)) != CS_OK) {
803 ast_debug(5,
"dispatch_thread_handler unlock\n");
809 ast_debug(5,
"dispatch_thread_handler unlock\n");
811 ast_log(
LOG_ERROR,
"Failed to get fd: initiliazing CPG. This module is now broken.\n");
825 if (res == -1 &&
errno != EINTR &&
errno != EAGAIN) {
827 cs_err = CS_ERR_BAD_HANDLE;
828 }
else if (res == 0) {
829 unsigned int local_nodeid;
832 ast_debug(5,
"dispatch_thread_handler rdlock\n");
833 if ((cs_err = cpg_local_get(
cpg_handle, &local_nodeid)) == CS_OK) {
834 struct cpg_name name;
835 struct cpg_address address[CPG_MEMBERS_MAX];
836 int entries = CPG_MEMBERS_MAX;
839 name.length = strlen(name.value);
840 if ((cs_err = cpg_membership_get(
cpg_handle, &name, address, &entries)) == CS_OK) {
844 ast_debug(1,
"CPG group has %i node membership\n", entries);
845 for (i = 0; (i < entries) && !found; i++) {
846 if (address[i].nodeid == local_nodeid)
852 cs_err = CS_ERR_BAD_HANDLE;
857 cs_err = CS_ERR_BAD_HANDLE;
862 cs_err = CS_ERR_BAD_HANDLE;
865 ast_debug(5,
"dispatch_thread_handler unlock\n");
869 cs_err = CS_ERR_BAD_HANDLE;
873 ast_debug(5,
"dispatch_thread_handler rdlock\n");
874 if (pfd[0].revents & POLLIN) {
875 if ((cs_err = cpg_dispatch(
cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
880 if (pfd[1].revents & POLLIN) {
881 if ((cs_err = corosync_cfg_dispatch(
cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
886 ast_debug(5,
"dispatch_thread_handler unlock\n");
891 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
898 struct cpg_name name;
899 ast_debug(5,
"dispatch_thread_handler wrlock\n");
913 ast_debug(5,
"dispatch_thread_handler unlock\n");
921 ast_debug(5,
"dispatch_thread_handler unlock\n");
926 if ((cs_err = cpg_fd_get(
cpg_handle, &pfd[0].fd)) != CS_OK) {
929 ast_debug(5,
"dispatch_thread_handler unlock\n");
934 if ((cs_err = corosync_cfg_fd_get(
cfg_handle, &pfd[1].fd)) != CS_OK) {
937 ast_debug(5,
"dispatch_thread_handler unlock\n");
943 name.length = strlen(name.value);
944 if ((cs_err = cpg_join(
cpg_handle, &name)) != CS_OK) {
947 ast_debug(5,
"dispatch_thread_handler unlock\n");
953 ast_debug(5,
"dispatch_thread_handler unlock\n");
957 ast_log(
LOG_NOTICE,
"Failed to recover from corosync failure: initializing CPG.\n");
968 cpg_iteration_handle_t cpg_iter;
969 struct cpg_iteration_description_t cpg_desc;
974 e->
command =
"corosync show members";
976 "Usage: corosync show members\n" 977 " Show corosync cluster members\n";
989 ast_debug(5,
"corosync_show_members rdlock\n");
990 cs_err = cpg_iteration_initialize(
cpg_handle, CPG_ITERATION_ALL,
NULL, &cpg_iter);
992 if (cs_err != CS_OK) {
993 ast_cli(a->
fd,
"Failed to initialize CPG iterator: %u.\n", cs_err);
994 cpg_iteration_finalize(cpg_iter);
996 ast_debug(5,
"corosync_show_members unlock\n");
1001 "=============================================================\n" 1002 "=== Cluster members =========================================\n" 1003 "=============================================================\n" 1006 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1008 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1009 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK 1010 corosync_cfg_node_address_t addrs[8];
1016 ast_cli(a->
fd,
"=== --> Group: %s\n", cpg_desc.group.value);
1018 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK 1024 cs_err = corosync_cfg_get_node_addrs(
cfg_handle, cpg_desc.nodeid,
1026 if (cs_err != CS_OK) {
1031 for (j = 0; j < num_addrs; j++) {
1032 struct sockaddr *sa = (
struct sockaddr *) addrs[j].
address;
1033 size_t sa_len = (size_t) addrs[j].address_length;
1036 getnameinfo(sa, sa_len, buf,
sizeof(buf),
NULL, 0, NI_NUMERICHOST);
1038 ast_cli(a->
fd,
"=== --> Address %u: %s\n", j + 1, buf);
1041 ast_cli(a->
fd,
"=== --> Nodeid: %"PRIu32
"\n", cpg_desc.nodeid);
1046 "=============================================================\n" 1049 cpg_iteration_finalize(cpg_iter);
1051 ast_debug(5,
"corosync_show_members unlock\n");
1053 ast_cli(a->
fd,
"Failed to initialize CPG iterator: initializing CPG.\n");
1067 "Usage: corosync ping\n" 1068 " Send a test ping to the cluster.\n" 1069 "A NOTICE will be in the log for every ping received\n" 1070 "on a server.\n If you send a ping, you should see a NOTICE\n" 1071 "in the log for every server in the cluster.\n";
1100 e->
command =
"corosync show config";
1102 "Usage: corosync show config\n" 1103 " Show configuration loaded from res_corosync.conf\n";
1115 "=============================================================\n" 1116 "=== res_corosync config =====================================\n" 1117 "=============================================================\n" 1121 ast_debug(5,
"corosync_show_config rdlock\n");
1124 ast_cli(a->
fd,
"=== ==> Publishing Event Type: %s\n",
1128 ast_cli(a->
fd,
"=== ==> Subscribing to Event Type: %s\n",
1133 ast_debug(5,
"corosync_show_config unlock\n");
1136 "=============================================================\n" 1184 ast_debug(5,
"load_general_config wrlock\n");
1192 if (!strcasecmp(v->
name,
"publish_event")) {
1194 }
else if (!strcasecmp(v->
name,
"subscribe_event")) {
1217 ast_debug(5,
"load_general_config unlock\n");
1224 static const char filename[] =
"res_corosync.conf";
1226 const char *cat =
NULL;
1237 if (!strcasecmp(cat,
"general")) {
1254 if (stasis_router) {
1263 ast_debug(5,
"cleanup_module wrlock\n");
1273 ast_debug(5,
"cleanup_module unlock\n");
1281 ao2_t_ref(messages, -1,
"Dispose of flushed cache");
1286 stasis_router =
NULL;
1289 if (corosync_aggregate_topic) {
1290 ao2_t_ref(corosync_aggregate_topic, -1,
"Dispose of topic on cleanup");
1291 corosync_aggregate_topic =
NULL;
1297 char meepmeep =
'x';
1318 ast_debug(5,
"cleanup_module wrlock\n");
1330 ast_debug(5,
"cleanup_module unlock\n");
1339 struct cpg_name name;
1353 if (!corosync_aggregate_topic) {
1359 if (!stasis_router) {
1394 name.length = strlen(name.value);
1396 if ((cs_err = cpg_join(
cpg_handle, &name)) != CS_OK) {
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
#define ao2_t_ref(o, delta, tag)
Reference/unreference an object and return the old refcount.
struct ast_variable * next
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
enum sip_cc_notify_state state
#define ast_rwlock_rdlock(a)
#define AST_CLI_DEFINE(fn, txt,...)
ast_device_state
Device States.
static struct corosync_node * corosync_node_alloc(struct ast_event *event)
static void * dispatch_thread_handler(void *data)
Asterisk main include file. File version handling, generic pbx functions.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
#define AST_RWLOCK_DEFINE_STATIC(rwlock)
static cpg_handle_t cpg_handle
static void publish_to_corosync(struct stasis_message *message)
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
static char * corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ao2_t_alloc(data_size, destructor_fn, debug_msg)
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync's poll process.
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
The arg parameter is a search key, but is not an object.
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
struct ast_variable * ast_variable_browse(const struct ast_config *config, const char *category_name)
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
struct stasis_cache * ast_mwi_state_cache(void)
Backend cache for ast_mwi_topic_cached().
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
static int dump_cache_cb(void *obj, void *arg, int flags)
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
descriptor for a cli entry.
static ast_rwlock_t event_types_lock
static struct ast_cli_entry corosync_cli[]
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
static int clear_node_cache(void *obj, void *arg, int flags)
#define ao2_callback(c, flags, cb_fn, arg)
#define CONFIG_STATUS_FILEINVALID
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed...
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
Structure for variables, used for configurations and for channel variables.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Assume that the ao2_container is already locked.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
Number of Used by: AST_EVENT_MWI Payload type: UINT.
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ao2_link_flags(container, obj, flags)
static void publish_cluster_discovery_to_stasis(struct ast_event *event)
Publish a received cluster discovery ast_event to Stasis Message Bus API.
struct stasis_topic * ast_mwi_topic_all(void)
Get the Stasis Message Bus API topic for MWI messages.
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
void ast_cli(int fd, const char *fmt,...)
#define ast_rwlock_unlock(a)
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
Socket address structure.
An Entity ID is essentially a MAC address, brief and unique.
unsigned char subscribe_default
static int unload_module(void)
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
int args
This gets set in ast_cli_register()
static int load_general_config(struct ast_config *cfg)
static char * corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
#define ast_strlen_zero(foo)
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
#define ast_rwlock_tryrdlock(a)
Configuration File Parser.
static char mailbox[AST_MAX_MAILBOX_UNIQUEID]
struct stasis_message_type * ast_device_state_message_type(void)
Get the Stasis message type for device state messages.
static void publish_corosync_ping_to_stasis(struct ast_event *event)
Publish a Corosync ping to Stasis Message Bus API.
#define ast_debug(level,...)
Log a DEBUG message.
Context IE Used by AST_EVENT_MWI Payload type: str.
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
#define ast_config_load(filename, flags)
Load a config file.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
#define AST_PTHREADT_NULL
#define ast_poll(a, b, c)
static struct ast_event * corosync_ping_to_event(struct stasis_message *message)
Convert a Corosync PING to a ast_event.
#define ao2_ref(o, delta)
void ast_config_destroy(struct ast_config *config)
Destroys a config.
#define ast_malloc(len)
A wrapper for malloc()
static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name, uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
static struct @457 event_types[]
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
static void publish_device_state_to_stasis(struct ast_event *event)
Publish a received device state ast_event to Stasis Message Bus API.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Event non-cachability flag Used by: All events Payload type: UINT.
#define stasis_message_router_create(topic)
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
static int corosync_node_joined
Join to corosync.
static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
static cpg_callbacks_t cpg_callbacks
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
A payload wrapper around a corosync ping event.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
struct stasis_cache * ast_device_state_cache(void)
Backend cache for ast_device_state_topic_cached()
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
static void cleanup_module(void)
struct stasis_topic * ast_device_state_topic_all(void)
Get the Stasis topic for device state messages.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes. ...
struct stasis_topic *(* topic_fn)(void)
static ast_rwlock_t init_cpg_lock
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
static corosync_cfg_callbacks_t cfg_callbacks
struct stasis_message_type * ast_cluster_discovery_type(void)
A stasis_message_type for Cluster discovery.
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,.to_event=corosync_ping_to_event,)
struct stasis_message_type * ast_mwi_state_type(void)
Get the Stasis Message Bus API message type for MWI messages.
Module has failed to load, may be in an inconsistent state.
#define ao2_find(container, arg, flags)
An API for managing task processing threads that can be shared across modules.
static int load_module(void)
Structure used to handle boolean flags.
Support for logging to various files, console and syslog Configuration in file logger.conf.
static void publish_mwi_to_stasis(struct ast_event *event)
Publish a received MWI ast_event to Stasis Message Bus API.
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
#define CONFIG_STATUS_FILEMISSING
static corosync_cfg_handle_t cfg_handle
#define ast_rwlock_wrlock(a)
struct ast_eid ast_eid_default
Global EID.
The arg parameter is an object of the same type.
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Standard Command Line Interface.
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
AST_MODULE_INFO_STANDARD_EXTENDED(ASTERISK_GPL_KEY, "Corosync")
#define ast_rwlock_trywrlock(a)
static int set_event(const char *event_type, int pubsub)
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
struct stasis_forward * sub
Abstract JSON element (object, array, string, int, ...).
int error(const char *format,...)
static int load_config(unsigned int reload)
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
static void publish_event_to_corosync(struct ast_event *event)
Search option field mask.
static char context[AST_MAX_CONTEXT]
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
unsigned char publish_default
#define ASTERISK_GPL_KEY
The text the key() function should return.
static int corosync_node_hash_fn(const void *obj, const int flags)
Asterisk module definitions.
struct stasis_message_type *(* message_type_fn)(void)
void(* publish_to_stasis)(struct ast_event *)
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
static struct @458 dispatch_thread
struct stasis_cache *(* cache_fn)(void)
Cluster node ID Used by: Corosync Payload type: UINT.
static char * corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle, corosync_cfg_shutdown_flags_t flags)
static struct ao2_container * nodes
All the nodes that we're aware of.