8 #define DRIPLINE_API_EXPORTS 17 #include "signal_handler.hh" 19 LOGGER(
dlog,
"receiver" );
42 a_orig.f_chunks_received = 0;
43 a_orig.f_processing.store(
false );
49 f_incoming_messages(),
50 f_single_message_wait_ms( 1000 )
55 f_incoming_messages(
std::move(a_orig.f_incoming_messages) ),
56 f_single_message_wait_ms( a_orig.f_single_message_wait_ms )
58 a_orig.f_single_message_wait_ms = 1000;
66 scarab::cancelable::operator=( std::move(a_orig) );
67 f_incoming_messages = std::move(a_orig.f_incoming_messages);
68 f_single_message_wait_ms = a_orig.f_single_message_wait_ms;
69 a_orig.f_single_message_wait_ms = 1000;
78 LDEBUG(
dlog,
"Received a message chunk <" << t_message->MessageId() );
81 if( incoming_messages().count( std::get<0>(t_parsed_message_id) ) == 0 )
84 LDEBUG(
dlog,
"This is the first chunk for this message; creating new message pack" );
88 t_pack.
f_messages.resize( std::get<2>(t_parsed_message_id) );
90 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
97 LDEBUG(
dlog,
"Single-chunk message being sent directly to processing" );
103 t_pack.
f_thread = std::thread([
this, &t_pack, &t_parsed_message_id](){
wait_for_message(t_pack, std::get<0>(t_parsed_message_id)); });
110 LDEBUG(
dlog,
"This is not the first chunk for this message; adding to message pack" );
114 LWARN(
dlog,
"Message <" << std::get<0>(t_parsed_message_id) <<
"> is already being processed\n" <<
115 "Just received chunk " << std::get<1>(t_parsed_message_id) <<
" of " << std::get<2>(t_parsed_message_id) );
120 std::unique_lock< std::mutex > t_lock( t_pack.
f_mutex );
121 if( t_pack.
f_messages[std::get<1>(t_parsed_message_id)] )
123 LWARN(
dlog,
"Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) <<
">; chunk " << std::get<1>(t_parsed_message_id) );
128 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
132 t_pack.
f_conv.notify_one();
139 LERROR(
dlog,
"Dripline exception caught while handling message chunk: " << e.what() );
141 catch( std::exception& e )
143 LERROR(
dlog,
"Standard exception caught while handling message chunk: " << e.what() );
151 std::unique_lock< std::mutex > t_lock( a_pack.
f_mutex );
163 auto t_now = std::chrono::system_clock::now();
165 while( a_pack.
f_conv.wait_until( t_lock, t_now + std::chrono::milliseconds(f_single_message_wait_ms) ) == std::cv_status::no_timeout )
178 LWARN(
dlog,
"Timed out; message may be incomplete" );
192 incoming_messages().erase( a_message_id );
202 LERROR(
dlog,
"Dripline exception caught while processing message pack: " << e.what() );
204 catch( std::exception& e )
206 LERROR(
dlog,
"Standard exception caught while processing message pack: " << e.what() );
214 throw dripline_error() <<
"Process_message function has not been implemented";
225 if ( ! a_receive_reply->f_channel )
230 LDEBUG(
dlog,
"Waiting for a reply" );
237 while( ! is_canceled() )
240 a_chan_valid =
core::listen_for_message( t_envelope, a_receive_reply->f_channel, a_receive_reply->f_consumer_tag, a_timeout_ms,
false );
245 LDEBUG(
dlog,
"Receiver was canceled before receiving reply" );
252 LDEBUG(
dlog,
"There was some error while listening on the channel; no message received" );
259 LDEBUG(
dlog,
"An empty envelope was returned from listening to a channel; listening may have timed out" );
266 LDEBUG(
dlog,
"Received a message chunk <" << t_message->MessageId() );
269 if( f_incoming_messages.count( std::get<0>(t_parsed_message_id) ) == 0 )
272 LDEBUG(
dlog,
"This is the first chunk for this message; creating new message pack" );
276 t_pack.
f_messages.resize( std::get<2>(t_parsed_message_id) );
278 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
290 LDEBUG(
dlog,
"This is not the first chunk for this message; adding to message pack" );
294 LWARN(
dlog,
"Message <" << std::get<0>(t_parsed_message_id) <<
"> is already being processed\n" <<
295 "Just received chunk " << std::get<1>(t_parsed_message_id) <<
" of " << std::get<2>(t_parsed_message_id) );
299 if( t_pack.
f_messages[std::get<1>(t_parsed_message_id)] )
301 LWARN(
dlog,
"Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) <<
">; chunk " << std::get<1>(t_parsed_message_id) );
306 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
319 LERROR(
dlog,
"There was a problem processing the message: " << e.what() );
324 LDEBUG(
dlog,
"Receiver was canceled" );
335 f_incoming_messages.erase( a_message_id );
337 if( t_message->is_reply() )
339 return std::static_pointer_cast<
msg_reply >( t_message );
348 LERROR(
dlog,
"Dripline exception caught while handling message: " << e.what() );
352 LERROR(
dlog,
"AMQP exception caught while sending reply: (" << e.reply_code() <<
") " << e.reply_text() );
356 LERROR(
dlog,
"AMQP Library Exception caught while sending reply: (" << e.ErrorCode() <<
") " << e.what() );
358 catch( std::exception& e )
360 LERROR(
dlog,
"Standard exception caught while sending reply: " << e.what() );
389 f_message_queue.push( a_message );
397 while( ! is_canceled() )
400 if( f_message_queue.timed_wait_and_pop( t_message ) )
406 catch(
const std::exception& e )
409 LERROR(
dlog,
"Exception caught; shutting down.\n" <<
"\t" << e.what() );
410 scarab::signal_handler::cancel_all( 1 );
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
virtual void process_message(message_ptr_t a_message)
Deposits the message in the concurrent queue (called by the listener)
virtual void submit_message(message_ptr_t a_message)=0
Receives and processes messages concurrently.
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
std::condition_variable f_conv
concurrent_receiver & operator=(const concurrent_receiver &)=delete
Dripline-specific errors.
void handle_message_chunk(amqp_envelope_ptr a_envelope)
reply_ptr_t process_received_reply(incoming_message_pack &a_pack, const std::string &a_message_id)
receiver & operator=(const receiver &a_orig)=delete
Stores the basic information about a set of message chunks that will eventually make a Dripline messa...
unsigned f_chunks_received
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
std::atomic< bool > f_processing
void process_message_pack(incoming_message_pack &a_pack, const std::string &a_message_id)
Converts a message pack into a Dripline message, and then submits the message for processing...
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
static scarab::logger dlog("agent")
void wait_for_message(incoming_message_pack &a_pack, const std::string &a_message_id)
virtual ~concurrent_receiver()
AmqpClient::AmqpException amqp_exception
std::shared_ptr< msg_reply > reply_ptr_t
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks]. ...
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
void execute()
Handles messages that appear in the concurrent queue by calling submit_message(). ...
std::string f_routing_key
AmqpClient::AmqpLibraryException amqp_lib_exception
static scarab::logger dlog("receiver")
virtual void process_message(message_ptr_t a_message)
std::shared_ptr< message > message_ptr_t
amqp_split_message_ptrs f_messages