8 #define DRIPLINE_API_EXPORTS 21 LOGGER(
dlog,
"listener" );
27 f_listen_timeout_ms( 1000 ),
32 cancelable(
std::move(a_orig) ),
33 f_channel(
std::move(a_orig.f_channel) ),
34 f_consumer_tag(
std::move(a_orig.f_consumer_tag) ),
35 f_listen_timeout_ms(
std::move(a_orig.f_listen_timeout_ms) ),
36 f_listener_thread(
std::move(a_orig.f_listener_thread) )
44 cancelable::operator=( std::move(a_orig) );
45 f_channel = std::move(a_orig.f_channel);
46 f_consumer_tag = std::move(a_orig.f_consumer_tag);
47 f_consumer_tag = std::move(a_orig.f_listen_timeout_ms);
48 f_listener_thread = std::move(a_orig.f_listener_thread);
55 f_endpoint( a_endpoint_ptr )
61 f_endpoint(
std::move(a_orig.f_endpoint) )
70 f_endpoint = std::move(a_orig.f_endpoint);
76 LINFO(
dlog,
"Listening for incoming messages on <" << f_endpoint->name() <<
">" );
78 while( ! f_canceled.load() )
84 if( f_canceled.load() )
86 LDEBUG(
dlog,
"Service canceled" );
90 if( ! t_envelope && t_channel_valid )
98 if( ! t_channel_valid )
100 LERROR(
dlog,
"Channel is no longer valid for <" << f_endpoint->name() <<
">" );
104 if( f_canceled.load() )
106 LDEBUG(
dlog,
"Listener <" << f_endpoint->name() <<
"> canceled" );
117 f_endpoint->sort_message( a_message );
122 LERROR(
dlog,
"<" << f_endpoint->name() <<
">: Dripline exception caught while handling message: " << e.what() );
127 LERROR(
dlog,
"<" << f_endpoint->name() <<
">: AMQP exception caught while handling message: (" << e.reply_code() <<
") " << e.reply_text() );
132 LERROR(
dlog,
"<" << f_endpoint->name() <<
">: AMQP Library Exception caught while handling message: (" << e.ErrorCode() <<
") " << e.what() );
135 catch( std::exception& e )
137 LERROR(
dlog,
"<" << f_endpoint->name() <<
">: Standard exception caught while handling message: " << e.what() );
listener_receiver & operator=(const listener_receiver &)=delete
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
Dripline-specific errors.
void handle_message_chunk(amqp_envelope_ptr a_envelope)
virtual ~endpoint_listener_receiver()
Convenience class to bring together listener and concurrent_receiver.
Decorator class for a plain endpoint: adds listener_receiver capabilities.
A listener is a class capable of listening for AMQP messages on an AMQP channel. This class provides ...
std::shared_ptr< endpoint > endpoint_ptr_t
virtual void submit_message(message_ptr_t a_message)
Direct submission of messages to the endpoint.
static scarab::logger dlog("agent")
AmqpClient::AmqpException amqp_exception
virtual bool listen_on_queue()
Listens for AMQP messages and then passes them to be handled as Dripline message chunks.
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
endpoint_listener_receiver & operator=(const endpoint_listener_receiver &)=delete
listener & operator=(const listener &)=delete
AmqpClient::AmqpLibraryException amqp_lib_exception
std::shared_ptr< message > message_ptr_t
endpoint_listener_receiver(endpoint_ptr_t a_endpoint_ptr)