8 #define DRIPLINE_API_EXPORTS 15 #include "authentication.hh" 22 LOGGER(
dlog,
"amqp" );
30 LDEBUG(
dlog,
"Stopping consuming messages" );
35 LERROR(
dlog,
"AMQP exception caught while canceling the channel: (" << e.reply_code() <<
") " << e.reply_text() );
39 LERROR(
dlog,
"AMQP library exception caught while canceling the channel: (" << e.ErrorCode() <<
") " << e.what() );
46 core::core(
const scarab::param_node& a_config,
const std::string& a_broker_address,
unsigned a_port,
const std::string& a_auth_file,
const bool a_make_connection ) :
47 f_address(
"localhost" ),
49 f_username(
"guest" ),
50 f_password(
"guest" ),
51 f_requests_exchange(
"requests" ),
52 f_alerts_exchange(
"alerts" ),
53 f_heartbeat_routing_key(
"heartbeat" ),
55 f_make_connection( a_make_connection )
58 std::string t_auth_file( a_auth_file );
59 if( t_auth_file.empty() ) t_auth_file = a_config.get_value(
"auth-file",
"" );
62 if( ! t_auth_file.empty() )
64 LDEBUG(
dlog,
"Using authentication file <" << t_auth_file <<
">" );
66 scarab::authentication t_auth( t_auth_file );
67 if( ! t_auth.get_is_loaded() )
69 throw dripline_error() <<
"Authentication file <" << a_auth_file <<
"> could not be loaded";
72 if( ! t_auth.has(
"amqp" ) )
74 throw dripline_error() <<
"No \"amqp\" authentication information present in <" << a_auth_file <<
">";
77 const scarab::param_node& t_amqp_auth = t_auth[
"amqp"].as_node();
78 if( ! t_amqp_auth.has(
"username" ) || ! t_amqp_auth.has(
"password" ) )
80 throw dripline_error() <<
"AMQP authentication is not available or is not complete";
82 f_username = t_amqp_auth[
"username"]().as_string();
83 f_password = t_amqp_auth[
"password"]().as_string();
86 if( t_amqp_auth.has(
"broker" ) )
88 f_address = t_amqp_auth[
"broker"]().as_string();
90 if( t_amqp_auth.has(
"broker-port" ) )
92 f_port = t_amqp_auth[
"broker-port"]().as_uint();
97 if( ! a_config.empty() )
99 f_address = a_config.get_value(
"broker", f_address );
100 f_port = a_config.get_value(
"broker-port", f_port );
101 f_requests_exchange = a_config.get_value(
"requests-exchange", f_requests_exchange );
102 f_alerts_exchange = a_config.get_value(
"alerts-exchange", f_alerts_exchange );
103 f_heartbeat_routing_key = a_config.get_value(
"heartbeat-routing-key", f_heartbeat_routing_key );
104 f_max_payload_size = a_config.get_value(
"max-payload-size", f_max_payload_size );
105 f_make_connection = a_config.get_value(
"make-connection", f_make_connection );
109 if( ! a_broker_address.empty() ) f_address = a_broker_address;
110 if( a_port != 0 ) f_port = a_port;
113 core::core(
const bool a_make_connection,
const scarab::param_node& a_config ) :
117 f_make_connection = a_make_connection;
121 f_address( a_orig.f_address ),
122 f_port( a_orig.f_port ),
123 f_username( a_orig.f_username ),
124 f_password( a_orig.f_password ),
125 f_requests_exchange( a_orig.f_requests_exchange ),
126 f_alerts_exchange( a_orig.f_alerts_exchange ),
127 f_heartbeat_routing_key( a_orig.f_heartbeat_routing_key ),
128 f_max_payload_size( a_orig.f_max_payload_size ),
129 f_make_connection( a_orig.f_make_connection )
133 f_address(
std::move(a_orig.f_address) ),
134 f_port( a_orig.f_port ),
135 f_username(
std::move(a_orig.f_username) ),
136 f_password(
std::move(a_orig.f_password) ),
137 f_requests_exchange(
std::move(a_orig.f_requests_exchange) ),
138 f_alerts_exchange(
std::move(a_orig.f_alerts_exchange) ),
139 f_heartbeat_routing_key(
std::move(a_orig.f_heartbeat_routing_key) ),
140 f_max_payload_size( a_orig.f_max_payload_size ),
141 f_make_connection(
std::move(a_orig.f_make_connection) )
152 f_address = a_orig.f_address;
153 f_port = a_orig.f_port;
154 f_username = a_orig.f_username;
155 f_password = a_orig.f_password;
156 f_requests_exchange = a_orig.f_requests_exchange;
157 f_alerts_exchange = a_orig.f_alerts_exchange;
158 f_heartbeat_routing_key = a_orig.f_heartbeat_routing_key;
159 f_max_payload_size = a_orig.f_max_payload_size;
160 f_make_connection = a_orig.f_make_connection;
166 f_address = std::move( a_orig.f_address );
167 f_port = a_orig.f_port;
169 f_username = std::move( a_orig.f_username );
170 f_password = std::move( a_orig.f_password );
171 f_requests_exchange = std::move( a_orig.f_requests_exchange );
172 f_alerts_exchange = std::move( a_orig.f_alerts_exchange );
173 f_heartbeat_routing_key = std::move( a_orig.f_heartbeat_routing_key );
174 f_max_payload_size = a_orig.f_max_payload_size;
176 f_make_connection = std::move( a_orig.f_make_connection );
182 LDEBUG(
dlog,
"Sending request with routing key <" << a_request->routing_key() <<
">" );
183 return do_send( std::static_pointer_cast< message >( a_request ), f_requests_exchange,
true );
188 LDEBUG(
dlog,
"Sending reply with routing key <" << a_reply->routing_key() <<
">" );
189 return do_send( std::static_pointer_cast< message >( a_reply ), f_requests_exchange,
false );
194 LDEBUG(
dlog,
"Sending alert with routing key <" << a_alert->routing_key() <<
">" );
195 return do_send( std::static_pointer_cast< message >( a_alert ), f_alerts_exchange,
false );
207 auto t_diagnostic_string_maker = [a_message,
this]() -> std::string {
208 return std::string(
"Broker: ") + f_address +
"\nPort: " +
std::to_string(f_port) +
"\nRouting Key: " + a_message->routing_key();
211 if ( ! f_make_connection || core::s_offline )
220 throw connection_error() <<
"Unable to open channel to send message\n" << t_diagnostic_string_maker();
225 throw dripline_error() <<
"Unable to setup the exchange <" << a_exchange <<
"> to send message\n" << t_diagnostic_string_maker();
230 std::unique_lock< std::mutex > t_rr_lock( t_receive_reply->f_mutex );
234 t_receive_reply->f_channel = t_channel;
237 std::string t_reply_to = t_channel->DeclareQueue(
"" );
238 t_channel->BindQueue( t_reply_to, a_exchange, t_reply_to );
240 a_message->reply_to() = t_reply_to;
243 t_receive_reply->f_consumer_tag = t_channel->BasicConsume( t_reply_to );
244 LDEBUG(
dlog,
"Reply-to for request: " << t_reply_to );
245 LDEBUG(
dlog,
"Consumer tag for reply: " << t_receive_reply->f_consumer_tag );
250 if( t_amqp_messages.empty() )
252 throw dripline_error() <<
"Unable to convert the dripline::message object to AMQP message(s) to be sent\n" << t_diagnostic_string_maker();
257 LDEBUG(
dlog,
"Sending message to <" << a_message->routing_key() <<
">" );
263 t_channel->BasicPublish( a_exchange, a_message->routing_key(), t_amqp_message, a_message->is_request(), false );
265 LDEBUG(
dlog,
"Message sent in " << t_amqp_messages.size() <<
" chunks" );
266 t_receive_reply->f_successful_send =
true;
267 t_receive_reply->f_send_error_message.clear();
269 catch( AmqpClient::ConnectionClosedException& e )
271 LERROR(
dlog,
"Unable to send message because the connection is closed: " << e.what() );
272 throw connection_error() <<
"Unable to send message because the connection is closed: " << e.what() <<
'\n' << t_diagnostic_string_maker();
274 catch( AmqpClient::AmqpLibraryException& e )
276 LERROR(
dlog,
"AMQP error while sending message: " << e.what() );
277 t_receive_reply->f_successful_send =
false;
278 t_receive_reply->f_send_error_message = std::string(
"AMQP error while sending message: ") + std::string(e.what()) +
'\n' + t_diagnostic_string_maker();
280 catch( AmqpClient::MessageReturnedException& e )
282 LERROR(
dlog,
"Message was returned: " << e.what() );
283 t_receive_reply->f_successful_send =
false;
284 t_receive_reply->f_send_error_message = std::string(
"Message was returned: ") + std::string(e.what()) +
'\n' + t_diagnostic_string_maker();
286 catch( std::exception& e )
288 LERROR(
dlog,
"Error while sending message: " << e.what() );
289 t_receive_reply->f_successful_send =
false;
290 t_receive_reply->f_send_error_message = std::string(
"Error while sending message: ") + std::string(e.what()) +
'\n' + t_diagnostic_string_maker();
293 return t_receive_reply;
298 if ( ! f_make_connection || core::s_offline )
305 LDEBUG(
dlog,
"Opening AMQP connection and creating channel to " << f_address <<
":" << f_port );
306 LDEBUG(
dlog,
"Using broker authentication: " << f_username <<
":" << f_password );
307 return AmqpClient::Channel::Create( f_address, f_port, f_username, f_password );
311 LERROR(
dlog,
"AMQP exception caught while opening channel: (" << e.reply_code() <<
") " << e.reply_text() );
316 LERROR(
dlog,
"AMQP Library Exception caught while creating channel: (" << e.ErrorCode() <<
") " << e.what() );
317 if( e.ErrorCode() == -9 )
319 LERROR(
dlog,
"This error means the client could not connect to the broker.\n\t" <<
320 "Check that you have the address and port correct, and that the broker is running.")
324 catch( std::exception& e )
326 LERROR(
dlog,
"Standard exception caught while creating channel: " << e.what() );
340 LDEBUG(
dlog,
"Declaring exchange <" << a_exchange <<
">" );
341 a_channel->DeclareExchange( a_exchange, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC,
false,
false,
false );
346 LERROR(
dlog,
"AMQP exception caught while declaring exchange: (" << e.reply_code() <<
") " << e.reply_text() );
351 LERROR(
dlog,
"AMQP library exception caught while declaring exchange: (" << e.ErrorCode() <<
") " << e.what() );
365 LDEBUG(
dlog,
"Declaring queue <" << a_queue_name <<
">" );
366 a_channel->DeclareQueue( a_queue_name,
false,
false,
true,
true );
371 LERROR(
dlog,
"AMQP exception caught while declaring queue: (" << e.reply_code() <<
") " << e.reply_text() );
376 LERROR(
dlog,
"AMQP library exception caught while declaring queue: (" << e.ErrorCode() <<
") " << e.what() );
382 bool core::bind_key(
amqp_channel_ptr a_channel,
const std::string& a_exchange,
const std::string& a_queue_name,
const std::string& a_routing_key )
391 LDEBUG(
dlog,
"Binding key <" << a_routing_key <<
"> to queue <" << a_queue_name <<
"> over exchange <" << a_exchange <<
">" );
392 a_channel->BindQueue( a_queue_name, a_exchange, a_routing_key );
398 LERROR(
dlog,
"AMQP exception caught while declaring binding key <" << a_routing_key <<
">: (" << e.reply_code() <<
") " << e.reply_text() );
403 LERROR(
dlog,
"AMQP library exception caught while binding key <" << a_routing_key <<
">: (" << e.ErrorCode() <<
") " << e.what() );
412 return std::string();
417 LDEBUG(
dlog,
"Starting to consume messages on queue <" << a_queue_name <<
">" );
419 return a_channel->BasicConsume( a_queue_name,
"",
true,
false );
423 LERROR(
dlog,
"AMQP exception caught while starting consuming messages on <" << a_queue_name <<
">: (" << e.reply_code() <<
") " << e.reply_text() );
424 return std::string();
428 LERROR(
dlog,
"AMQP library exception caught while starting consuming messages on <" << a_queue_name <<
">: (" << e.ErrorCode() <<
") " << e.what() );
429 return std::string();
442 LDEBUG(
dlog,
"Stopping consuming messages for consumer <" << a_consumer_tag <<
">" );
443 a_channel->BasicCancel( a_consumer_tag );
444 a_consumer_tag.clear();
449 LERROR(
dlog,
"AMQP exception caught while stopping consuming messages on <" << a_consumer_tag <<
">: (" << e.reply_code() <<
") " << e.reply_text() );
454 LERROR(
dlog,
"AMQP library exception caught while stopping consuming messages on <" << a_consumer_tag <<
">: (" << e.ErrorCode() <<
") " << e.what() );
457 catch( AmqpClient::ConsumerTagNotFoundException& e )
459 LERROR(
dlog,
"Fatal AMQP exception encountered while stopping consuming messages on <" << a_consumer_tag <<
">: " << e.what() );
462 catch( std::exception& e )
464 LERROR(
dlog,
"Standard exception caught while stopping consuming messages on <" << a_consumer_tag <<
">: " << e.what() );
478 LDEBUG(
dlog,
"Deleting queue <" << a_queue_name <<
">" );
479 a_channel->DeleteQueue( a_queue_name,
false );
482 catch( AmqpClient::ConnectionClosedException& e )
484 LERROR(
dlog,
"Fatal AMQP exception encountered removing queue <" << a_queue_name <<
">: " << e.what() );
489 LERROR(
dlog,
"AMQP library exception caught while removing queue <" << a_queue_name <<
">: (" << e.ErrorCode() <<
") " << e.what() );
492 catch( std::exception& e )
494 LERROR(
dlog,
"Standard exception caught while removing queue <" << a_queue_name <<
">: " << e.what() );
510 if( a_timeout_ms > 0 )
512 a_channel->BasicConsumeMessage( a_consumer_tag, a_envelope, a_timeout_ms );
516 a_envelope = a_channel->BasicConsumeMessage( a_consumer_tag );
518 if( a_envelope && a_do_ack ) a_channel->BasicAck( a_envelope );
521 catch( AmqpClient::ConnectionClosedException& e )
523 LERROR(
dlog,
"Fatal AMQP exception encountered: " << e.what() );
526 catch( AmqpClient::ConsumerCancelledException& e )
528 LERROR(
dlog,
"Fatal AMQP exception encountered: " << e.what() );
531 catch( AmqpClient::AmqpException& e )
533 if( e.is_soft_error() )
535 LWARN(
dlog,
"Non-fatal AMQP exception encountered: " << e.reply_text() );
538 LERROR(
dlog,
"Fatal AMQP exception encountered: " << e.reply_text() );
541 catch( std::exception& e )
543 LERROR(
dlog,
"Standard exception caught: " << e.what() );
548 LERROR(
dlog,
"Unknown exception caught" );
virtual sent_msg_pkg_ptr send(request_ptr_t a_request) const
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
std::shared_ptr< msg_request > request_ptr_t
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
sent_msg_pkg_ptr do_send(message_ptr_t a_message, const std::string &a_exchange, bool a_expect_reply) const
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
amqp_channel_ptr f_channel
std::vector< amqp_message_ptr > amqp_split_message_ptrs
amqp_channel_ptr open_channel() const
std::string f_consumer_tag
static scarab::logger dlog("agent")
Error indicating a problem with the connection to the broker.
AmqpClient::Channel::ptr_t amqp_channel_ptr
std::string to_string(op_t an_op)
Gives the human-readable version of a message operation.
AmqpClient::AmqpException amqp_exception
std::shared_ptr< msg_reply > reply_ptr_t
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
AmqpClient::AmqpLibraryException amqp_lib_exception
core(const scarab::param_node &a_config=scarab::param_node(), const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="", const bool a_make_connection=true)
std::shared_ptr< message > message_ptr_t
#define DL_MAX_PAYLOAD_SIZE
core & operator=(const core &a_orig)