77 #define STATE_BUCKETS 57 169 const char *
file,
int line,
const char *func)
212 state->
topic = state_topic;
220 strcpy(proxy->
id,
id);
222 state->id = proxy->
id;
270 #define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__) 273 const char *
file,
int line,
const char *func)
284 state =
state_alloc(manager, state_topic,
id, file, line, func);
298 char *container_name =
313 static void state_prnt_obj(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
354 char *container_name =
379 topic = state->
topic;
457 ast_debug(3,
"Creating stasis state subscription to id '%s'. Topic: '%s':%p %d\n",
489 return sub->
state->id;
554 return pub->
state->id;
673 ast_debug(5,
"Attempted to remove state for id '%s', but state not found\n",
id);
721 res =
handler(state->id, msg, data);
Managed stasis state event interface.
struct stasis_forward * forward
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
struct stasis_state * state
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
enum sip_cc_notify_state state
static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
#define state_find_or_add(mgr, top, id)
static void state_dtor(void *obj)
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
struct stasis_topic * topic
static struct stasis_state * __state_find_or_add(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
The arg parameter is a search key, but is not an object.
#define AST_VECTOR_RW(name, type)
Define a vector structure with a read/write lock.
struct ao2_container * observers
Registered global observers.
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
static void state_proxy_dtor(void *obj)
struct stasis_topic * stasis_state_subscriber_topic(struct stasis_state_subscriber *sub)
Retrieve the subscriber's topic.
struct stasis_state_manager * manager
The manager that owns and handles this state.
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
void * __ao2_alloc(size_t data_size, ao2_destructor_fn destructor_fn, unsigned int options, const char *tag, const char *file, int line, const char *func) attribute_warn_unused_result
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
Assume that the ao2_container is already locked.
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
struct stasis_message * msg
#define ao2_alloc_options(data_size, destructor_fn, options)
#define ao2_link_flags(container, obj, flags)
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
static struct stasis_topic * manager_topic
A stasis_topic that all topics AMI cares about will be forwarded to.
static void state_proxy_sub_cb(void *obj, void *data)
struct stasis_state_subscriber * stasis_state_add_subscriber(struct stasis_state_manager *manager, const char *id)
Add a subscriber to the managed stasis state for the given id.
const char * stasis_state_subscriber_id(const struct stasis_state_subscriber *sub)
Retrieve the underlying subscribed to state's unique id.
void * stasis_state_unsubscribe(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic and stasis state.
An Entity ID is essentially a MAC address, brief and unique.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
static const char * state_id_by_topic(struct stasis_topic *manager_topic, const struct stasis_topic *state_topic)
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
void() ao2_prnt_fn(void *where, const char *fmt,...)
Print output.
#define ast_strlen_zero(foo)
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
struct stasis_topic * stasis_state_topic(struct stasis_state_manager *manager, const char *id)
Retrieve a managed topic creating one if not currently managed.
struct stasis_topic * all_topic
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed, and explicitly subscribed state call the given handler.
static struct stasis_state * state_alloc(struct stasis_state_manager *manager, struct stasis_topic *state_topic, const char *id, const char *file, int line, const char *func)
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
#define ao2_weakproxy_get_object(weakproxy, flags)
#define ao2_ref(o, delta)
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
struct stasis_topic * stasis_state_all_topic(struct stasis_state_manager *manager)
Retrieve the manager's topic (the topic that all state topics get forwarded to)
struct stasis_state_manager * manager
static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
#define AST_VECTOR(name, type)
Define a vector structure.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
static void publisher_dtor(void *obj)
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
static void subscriber_dtor(void *obj)
struct stasis_subscription * stasis_state_subscriber_subscription(struct stasis_state_subscriber *sub)
Retrieve the stasis topic subscription if available.
#define ao2_unlink(container, obj)
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
#define ao2_t_weakproxy_set_object(weakproxy, obj, flags, tag)
#define ao2_callback_data(container, flags, cb_fn, arg, data)
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
#define stasis_subscribe_pool(topic, callback, data)
struct stasis_subscription * stasis_sub
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
struct stasis_topic * stasis_state_publisher_topic(struct stasis_state_publisher *pub)
Retrieve the publisher's topic.
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id)
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
struct stasis_forward * stasis_forward_cancel(struct stasis_forward *forward)
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
struct ast_eid ast_eid_default
Global EID.
static void state_find_and_remove_eid(struct stasis_state *state, const struct ast_eid *eid)
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
static void state_manager_dtor(void *obj)
#define ao2_replace(dst, src)
static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
struct stasis_state * state
static void handler(const char *name, int response_code, struct ast_variable *get_params, struct ast_variable *path_vars, struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
struct ast_sorcery_instance_observer observer
struct stasis_forward * sub
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
unsigned int num_subscribers
#define ao2_t_weakproxy_alloc(data_size, destructor_fn, tag)
struct ao2_container * states
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id)
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
static void state_find_or_add_eid(struct stasis_state *state, const struct ast_eid *eid)