1 #define DRIPLINE_API_EXPORTS 14 LOGGER(
dlog,
"relayer" );
16 relayer::relayer(
const scarab::param_node& a_config,
const std::string& a_broker_address,
unsigned a_port,
const std::string& a_auth_file ) :
17 core( a_config, a_broker_address, a_port, a_auth_file ),
30 LDEBUG(
dlog,
"Dripline relayer starting" );
31 while( ! is_canceled() )
34 bool t_have_message =
f_queue.timed_wait_and_pop( t_mar );
35 if( ! t_have_message )
continue;
39 std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
40 switch( t_mar->f_message->message_type() )
43 t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr =
core::send( std::static_pointer_cast< dripline::msg_request >( t_mar->f_message ) );
46 t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr =
core::send( std::static_pointer_cast< dripline::msg_alert >( t_mar->f_message ) );
49 throw dripline_error() <<
"Unsupported message type: " << t_mar->f_message->message_type();
52 if( ! t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_successful_send )
54 LERROR(
dlog,
"Message sending failed: " << t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_send_error_message <<
'\n' << *t_mar->f_message );
56 t_mar->f_wait_for_send_pkg->f_condition_var.notify_one();
61 LWARN(
dlog,
"Operating in offline mode; message not sent" );
65 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() <<
'\n' << *t_mar->f_message );
69 LERROR(
dlog,
"Dripline error while sending reply:\n" << e.what() <<
'\n' << *t_mar->f_message );
74 LDEBUG(
dlog,
"Exiting the Dripline relayer" );
82 LDEBUG(
dlog,
"Canceling relayer" );
91 LWARN(
dlog,
"Relayer has been canceled; request not sent" );
93 t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
94 t_return->f_sent_msg_pkg_ptr->f_successful_send =
false;
97 LDEBUG(
dlog,
"Sending request to <" << a_request->routing_key() <<
">" );
98 mar_ptr t_mar = std::make_shared< message_and_reply >();
99 std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
101 t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
103 return t_mar->f_wait_for_send_pkg;
110 LWARN(
dlog,
"Relayer has been canceled; request not sent" );
112 t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
113 t_return->f_sent_msg_pkg_ptr->f_successful_send =
false;
116 LDEBUG(
dlog,
"Sending alert to <" << a_alert->routing_key() <<
">" );
117 mar_ptr t_mar = std::make_shared< message_and_reply >();
118 std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
120 t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
122 return t_mar->f_wait_for_send_pkg;
133 std::unique_lock< std::mutex > t_lock( a_receive_reply->f_mutex );
134 auto t_deadline = std::chrono::system_clock::now() + std::chrono::milliseconds( a_timeout_ms );
135 while( ! a_receive_reply->f_sent_msg_pkg_ptr )
137 std::cv_status t_status = a_receive_reply->f_condition_var.wait_until( t_lock, t_deadline );
138 if( t_status == std::cv_status::timeout )
144 return f_msg_receiver.wait_for_reply( a_receive_reply->f_sent_msg_pkg_ptr, a_chan_valid, a_timeout_ms );
virtual sent_msg_pkg_ptr send(request_ptr_t a_request) const
scarab::concurrent_queue< mar_ptr > f_queue
void do_cancellation(int a_code)
std::shared_ptr< message_and_reply > mar_ptr
reply_ptr_t wait_for_reply(const wait_for_send_pkg_ptr a_receive_reply, int a_timeout_ms=0)
std::shared_ptr< msg_request > request_ptr_t
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
Contains all of the information common to all types of Dripline messages.
void execute_relayer()
Thread execution function: sends any messages that are submitted via the send functions.
relayer(const scarab::param_node &a_config=scarab::param_node(), const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="")
static scarab::logger dlog("agent")
Error indicating a problem with the connection to the broker.
std::shared_ptr< msg_reply > reply_ptr_t
std::shared_ptr< wait_for_send_pkg > wait_for_send_pkg_ptr
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
std::shared_ptr< message > message_ptr_t
wait_for_send_pkg_ptr send_async(request_ptr_t a_request) const