![]() |
Dripline-Cpp
v2.4.2
Dripline Implementation in C++
|
Listens for messages sent to a particular set of keys and prints them. More...
#include <monitor.hh>
Public Types | |
typedef std::vector< std::string > | keys_t |
Public Member Functions | |
monitor (const scarab::param_node &a_config=scarab::param_node()) | |
monitor (const monitor &)=delete | |
monitor (monitor &&a_orig) | |
virtual | ~monitor () |
monitor & | operator= (const monitor &)=delete |
monitor & | operator= (monitor &&a_orig) |
snake_case_mv_accessible (status, status) | |
snake_case_mv_referrable (std::string, name) | |
Name for this monitor; automiatically set to monitor_[uuid] More... | |
snake_case_mv_accessible (bool, json_print) | |
snake_case_mv_accessible (bool, pretty_print) | |
Flag to indicate whether JSON should be printed with extra whitespace for human readability. More... | |
snake_case_mv_referrable (keys_t, requests_keys) | |
Set of request keys to be listened for. More... | |
snake_case_mv_referrable (keys_t, alerts_keys) | |
Set of alerts keys to be listened for. More... | |
bool | start () |
Opens the AMQP connection, binds keys, and starts consuming. More... | |
bool | listen () |
Starts actively listening for and handling messages (blocking). More... | |
bool | stop () |
Stops listening for messages and closes the AMQP connection. More... | |
virtual bool | listen_on_queue () |
Waits for a single AMQP message and processes it. More... | |
virtual void | submit_message (message_ptr_t a_message) |
![]() | |
core (const scarab::param_node &a_config=scarab::param_node(), const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="", const bool a_make_connection=true) | |
core (const bool a_make_connection, const scarab::param_node &a_config=scarab::param_node()) | |
core (const core &a_orig) | |
core (core &&a_orig) | |
virtual | ~core () |
core & | operator= (const core &a_orig) |
core & | operator= (core &&a_orig) |
virtual sent_msg_pkg_ptr | send (request_ptr_t a_request) const |
virtual sent_msg_pkg_ptr | send (reply_ptr_t a_reply) const |
virtual sent_msg_pkg_ptr | send (alert_ptr_t a_alert) const |
snake_case_mv_referrable (std::string, address) | |
snake_case_mv_accessible (unsigned, port) | |
snake_case_mv_referrable (std::string, username) | |
snake_case_mv_referrable (std::string, password) | |
snake_case_mv_referrable (std::string, requests_exchange) | |
snake_case_mv_referrable (std::string, alerts_exchange) | |
snake_case_mv_referrable (std::string, heartbeat_routing_key) | |
snake_case_mv_accessible (unsigned, max_payload_size) | |
snake_case_mv_accessible (bool, make_connection) | |
![]() | |
listener_receiver () | |
listener_receiver (const listener_receiver &)=delete | |
listener_receiver (listener_receiver &&a_orig) | |
listener_receiver & | operator= (const listener_receiver &)=delete |
listener_receiver & | operator= (listener_receiver &&a_orig) |
![]() | |
listener () | |
listener (const listener &)=delete | |
listener (listener &&a_orig) | |
virtual | ~listener () |
listener & | operator= (const listener &)=delete |
listener & | operator= (listener &&a_orig) |
snake_case_mv_referrable (amqp_channel_ptr, channel) | |
snake_case_mv_referrable (std::string, consumer_tag) | |
snake_case_mv_accessible (unsigned, listen_timeout_ms) | |
snake_case_mv_referrable (std::thread, listener_thread) | |
![]() | |
concurrent_receiver () | |
concurrent_receiver (const concurrent_receiver &)=delete | |
concurrent_receiver (concurrent_receiver &&a_orig) | |
virtual | ~concurrent_receiver () |
concurrent_receiver & | operator= (const concurrent_receiver &)=delete |
concurrent_receiver & | operator= (concurrent_receiver &&a_orig) |
virtual void | process_message (message_ptr_t a_message) |
Deposits the message in the concurrent queue (called by the listener) More... | |
void | execute () |
Handles messages that appear in the concurrent queue by calling submit_message() . More... | |
![]() | |
receiver () | |
receiver (const receiver &a_orig)=delete | |
receiver (receiver &&a_orig) | |
virtual | ~receiver () |
receiver & | operator= (const receiver &a_orig)=delete |
receiver & | operator= (receiver &&a_orig) |
void | handle_message_chunk (amqp_envelope_ptr a_envelope) |
void | wait_for_message (incoming_message_pack &a_pack, const std::string &a_message_id) |
void | process_message_pack (incoming_message_pack &a_pack, const std::string &a_message_id) |
Converts a message pack into a Dripline message, and then submits the message for processing. More... | |
snake_case_mv_referrable (incoming_message_map, incoming_messages) | |
Stores the incomplete messages. More... | |
snake_case_mv_accessible (unsigned, single_message_wait_ms) | |
Wait time for message chunks from a single message. More... | |
reply_ptr_t | wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0) |
reply_ptr_t | wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, bool &a_chan_valid, int a_timeout_ms=0) |
Protected Types | |
enum | status { nothing = 0, channel_created = 10, exchange_declared = 20, queue_declared = 30, queue_bound = 40, consuming = 50, listening = 60 } |
Protected Member Functions | |
bool | bind_keys () |
![]() | |
sent_msg_pkg_ptr | do_send (message_ptr_t a_message, const std::string &a_exchange, bool a_expect_reply) const |
amqp_channel_ptr | send_withreply (message_ptr_t a_message, std::string &a_reply_consumer_tag, const std::string &a_exchange) const |
bool | send_noreply (message_ptr_t a_message, const std::string &a_exchange) const |
amqp_channel_ptr | open_channel () const |
![]() | |
snake_case_mv_referrable (scarab::concurrent_queue< message_ptr_t >, message_queue) | |
snake_case_mv_referrable (std::thread, receiver_thread) | |
![]() | |
reply_ptr_t | process_received_reply (incoming_message_pack &a_pack, const std::string &a_message_id) |
Additional Inherited Members | |
![]() | |
static bool | listen_for_message (amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true) |
return: if false, channel is no longer useable; if true, may be reused More... | |
![]() | |
static bool | s_offline = false |
![]() | |
static bool | setup_exchange (amqp_channel_ptr a_channel, const std::string &a_exchange) |
static bool | setup_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name) |
static bool | bind_key (amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key) |
static std::string | start_consuming (amqp_channel_ptr a_channel, const std::string &a_queue_name) |
static bool | stop_consuming (amqp_channel_ptr a_channel, std::string &a_consumer_tag) |
static bool | remove_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name) |
Listens for messages sent to a particular set of keys and prints them.
The monitor is initially configured with a set of keys to listen for. Keys can be specified for the alerts exchange and for the requests exchange.
Keys can be passed to the monitor in the configuration scarab::param_node object. Alerts keys can be supplied as alerts-keys
followed by an array of keys or alerts-key
followed by a single key. Requests keys can be supplied as requests-keys
followed by an array of keys or requests-key
followed by a single key.
All AMQP-standard notation for keys, including wildcards, are allowed.
When activated, the alerts keys are bound to the alerts exchange, and the requests keys are bound to the requests exchange. The monitor then waits to receive a message. When a message is seen, it prints it to stdout.
Definition at line 39 of file monitor.hh.
typedef std::vector< std::string > keys_t |
Definition at line 75 of file monitor.hh.
|
strongprotected |
Enumerator | |
---|---|
nothing | |
channel_created | |
exchange_declared | |
queue_declared | |
queue_bound | |
consuming | |
listening |
Definition at line 44 of file monitor.hh.
monitor | ( | const scarab::param_node & | a_config = scarab::param_node() | ) |
Definition at line 23 of file monitor.cc.
Definition at line 71 of file monitor.cc.
|
virtual |
Definition at line 87 of file monitor.cc.
|
protected |
Definition at line 215 of file monitor.cc.
bool listen | ( | ) |
Starts actively listening for and handling messages (blocking).
Definition at line 149 of file monitor.cc.
|
virtual |
Waits for a single AMQP message and processes it.
Implements listener.
Definition at line 232 of file monitor.cc.
Definition at line 97 of file monitor.cc.
snake_case_mv_accessible | ( | bool | , |
json_print | |||
) |
Flag to indicate whether syntactically-correct JSON should be printed, or whether the default style (similar to but not exactly JSON) should be used.
snake_case_mv_accessible | ( | bool | , |
pretty_print | |||
) |
Flag to indicate whether JSON should be printed with extra whitespace for human readability.
snake_case_mv_referrable | ( | std::string | , |
name | |||
) |
Name for this monitor; automiatically set to monitor_[uuid]
snake_case_mv_referrable | ( | keys_t | , |
requests_keys | |||
) |
Set of request keys to be listened for.
snake_case_mv_referrable | ( | keys_t | , |
alerts_keys | |||
) |
Set of alerts keys to be listened for.
bool start | ( | ) |
Opens the AMQP connection, binds keys, and starts consuming.
Definition at line 110 of file monitor.cc.
bool stop | ( | ) |
Stops listening for messages and closes the AMQP connection.
Definition at line 190 of file monitor.cc.
|
virtual |
Handles a single Dripline message by printing it to stdout. Printing is done via a prog-level message in the logger.
Implements concurrent_receiver.
Definition at line 270 of file monitor.cc.