Dripline-Cpp  v2.4.2
Dripline Implementation in C++
receiver.cc
Go to the documentation of this file.
1 /*
2  * receiver.cc
3  *
4  * Created on: Feb 18, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "receiver.hh"
11 
12 #include "core.hh"
13 #include "dripline_exceptions.hh"
14 #include "message.hh"
15 
16 #include "logger.hh"
17 #include "signal_handler.hh"
18 
19 LOGGER( dlog, "receiver" );
20 
21 namespace dripline
22 {
24  f_messages(),
25  f_chunks_received(),
26  f_routing_key(),
27  f_thread(),
28  f_mutex(),
29  f_conv(),
30  f_processing( false )
31  {}
32 
34  f_messages( std::move(a_orig.f_messages) ),
36  f_routing_key( std::move(a_orig.f_routing_key) ),
37  f_thread( std::move(a_orig.f_thread) ),
38  f_mutex(),
39  f_conv(),
40  f_processing( a_orig.f_processing.load() )
41  {
42  a_orig.f_chunks_received = 0;
43  a_orig.f_processing.store( false );
44  }
45 
46 
48  scarab::cancelable(),
49  f_incoming_messages(),
50  f_single_message_wait_ms( 1000 )
51  {}
52 
54  scarab::cancelable( std::move(a_orig) ),
55  f_incoming_messages( std::move(a_orig.f_incoming_messages) ),
56  f_single_message_wait_ms( a_orig.f_single_message_wait_ms )
57  {
58  a_orig.f_single_message_wait_ms = 1000;
59  }
60 
62  {}
63 
65  {
66  scarab::cancelable::operator=( std::move(a_orig) );
67  f_incoming_messages = std::move(a_orig.f_incoming_messages);
68  f_single_message_wait_ms = a_orig.f_single_message_wait_ms;
69  a_orig.f_single_message_wait_ms = 1000;
70  return *this;
71  }
72 
74  {
75  try
76  {
77  amqp_message_ptr t_message = a_envelope->Message();
78  LDEBUG( dlog, "Received a message chunk <" << t_message->MessageId() );
79 
80  auto t_parsed_message_id = message::parse_message_id( t_message->MessageId() );
81  if( incoming_messages().count( std::get<0>(t_parsed_message_id) ) == 0 )
82  {
83  // this path: first chunk for this message
84  LDEBUG( dlog, "This is the first chunk for this message; creating new message pack" );
85  // create the new message_pack object
86  incoming_message_pack& t_pack = incoming_messages()[std::get<0>(t_parsed_message_id)];
87  // set the f_messages vector to the expected size
88  t_pack.f_messages.resize( std::get<2>(t_parsed_message_id) );
89  // put in place the first message chunk received
90  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
91  t_pack.f_routing_key = a_envelope->RoutingKey();
92  t_pack.f_chunks_received = 1;
93 
94  if( t_pack.f_messages.size() == 1 )
95  {
96  // if we only expect one chunk, we can bypass creating a separate thread, etc
97  LDEBUG( dlog, "Single-chunk message being sent directly to processing" );
98  process_message_pack( t_pack, t_message->MessageId() );
99  }
100  else
101  {
102  // start the thread to wait for message chunks
103  t_pack.f_thread = std::thread([this, &t_pack, &t_parsed_message_id](){ wait_for_message(t_pack, std::get<0>(t_parsed_message_id)); });
104  t_pack.f_thread.detach();
105  }
106  }
107  else
108  {
109  // this path: have already received chunks from this message
110  LDEBUG( dlog, "This is not the first chunk for this message; adding to message pack" );
111  incoming_message_pack& t_pack = incoming_messages()[std::get<0>(t_parsed_message_id)];
112  if( t_pack.f_processing.load() )
113  {
114  LWARN( dlog, "Message <" << std::get<0>(t_parsed_message_id) << "> is already being processed\n" <<
115  "Just received chunk " << std::get<1>(t_parsed_message_id) << " of " << std::get<2>(t_parsed_message_id) );
116  }
117  else
118  {
119  // lock mutex to access f_messages
120  std::unique_lock< std::mutex > t_lock( t_pack.f_mutex );
121  if( t_pack.f_messages[std::get<1>(t_parsed_message_id)] )
122  {
123  LWARN( dlog, "Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) << ">; chunk " << std::get<1>(t_parsed_message_id) );
124  }
125  else
126  {
127  // add chunk to set of chunks
128  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
129  ++t_pack.f_chunks_received;
130  t_lock.unlock();
131  // inform the message-processing thread it should check whether it has the complete message
132  t_pack.f_conv.notify_one();
133  }
134  }
135  } // new/current message if/else block
136  }
137  catch( dripline_error& e )
138  {
139  LERROR( dlog, "Dripline exception caught while handling message chunk: " << e.what() );
140  }
141  catch( std::exception& e )
142  {
143  LERROR( dlog, "Standard exception caught while handling message chunk: " << e.what() );
144  }
145 
146  return;
147  }
148 
149  void receiver::wait_for_message( incoming_message_pack& a_pack, const std::string& a_message_id )
150  {
151  std::unique_lock< std::mutex > t_lock( a_pack.f_mutex );
152 
153  LDEBUG( dlog, "Waiting for message; chunks received: " << a_pack.f_chunks_received << " chunks expected: " << a_pack.f_messages.size() );
154 
155  // if the message is already complete, submit it for processing
156  if( a_pack.f_chunks_received == a_pack.f_messages.size() )
157  {
158  t_lock.release(); // process_message() will unlock the mutex before erasing the message pack
159  process_message_pack( a_pack, a_message_id );
160  return;
161  }
162 
163  auto t_now = std::chrono::system_clock::now();
164 
165  while( a_pack.f_conv.wait_until( t_lock, t_now + std::chrono::milliseconds(f_single_message_wait_ms) ) == std::cv_status::no_timeout )
166  {
167  // if the message is complete during the waiting period, submit it for processing
168  if( a_pack.f_chunks_received == a_pack.f_messages.size() )
169  {
170  t_lock.release(); // process_message() will unlock the mutex before erasing the message pack
171  process_message_pack( a_pack, a_message_id );
172  return;
173  }
174  }
175 
176  // once the waiting period is over, submit it whether it's complete or not
177  t_lock.release(); // process_message() will unlock the mutex before erasing the message pack
178  LWARN( dlog, "Timed out; message may be incomplete" );
179  process_message_pack( a_pack, a_message_id );
180 
181  return;
182  }
183 
184  void receiver::process_message_pack( incoming_message_pack& a_pack, const std::string& a_message_id )
185  {
186  a_pack.f_processing.store( true );
187  try
188  {
189  message_ptr_t t_message = message::process_message( a_pack.f_messages, a_pack.f_routing_key );
190 
191  a_pack.f_mutex.unlock();
192  incoming_messages().erase( a_message_id );
193 
194  // if the message is not valid at this point, continue processing it, and we'll deal with it in the endpoint class
195 
196  this->process_message( t_message );
197 
198  return;
199  }
200  catch( dripline_error& e )
201  {
202  LERROR( dlog, "Dripline exception caught while processing message pack: " << e.what() );
203  }
204  catch( std::exception& e )
205  {
206  LERROR( dlog, "Standard exception caught while processing message pack: " << e.what() );
207  }
208 
209  return;
210  }
211 
213  {
214  throw dripline_error() << "Process_message function has not been implemented";
215  }
216 
217  reply_ptr_t receiver::wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms )
218  {
219  bool t_temp;
220  return wait_for_reply( a_receive_reply, t_temp, a_timeout_ms );
221  }
222 
223  reply_ptr_t receiver::wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, bool& a_chan_valid, int a_timeout_ms )
224  {
225  if ( ! a_receive_reply->f_channel )
226  {
227  return reply_ptr_t();
228  }
229 
230  LDEBUG( dlog, "Waiting for a reply" );
231 
232  // wait for messages until either:
233  // 1. the channel is no longer valid (return empty reply pointer; a_chan_valid will be false)
234  // 2. listening times out (return empty reply pointer; a_chan_valid will be true)
235  // 3. a full dripline message is received (return message)
236  // 4. error processing a recieved amqp message (return empty reply pointer)
237  while( ! is_canceled() )
238  {
239  amqp_envelope_ptr t_envelope;
240  a_chan_valid = core::listen_for_message( t_envelope, a_receive_reply->f_channel, a_receive_reply->f_consumer_tag, a_timeout_ms, false );
241 
242  // check whether we canceled while listening
243  if( is_canceled() )
244  {
245  LDEBUG( dlog, "Receiver was canceled before receiving reply" );
246  return reply_ptr_t();
247  }
248 
249  // there was an error listening on the channel; no message received
250  if( ! a_chan_valid )
251  {
252  LDEBUG( dlog, "There was some error while listening on the channel; no message received" );
253  return reply_ptr_t();
254  }
255 
256  // listening timed out
257  if( ! t_envelope )
258  {
259  LDEBUG( dlog, "An empty envelope was returned from listening to a channel; listening may have timed out" );
260  return reply_ptr_t();
261  }
262 
263  try
264  {
265  amqp_message_ptr t_message = t_envelope->Message();
266  LDEBUG( dlog, "Received a message chunk <" << t_message->MessageId() );
267 
268  auto t_parsed_message_id = message::parse_message_id( t_message->MessageId() );
269  if( f_incoming_messages.count( std::get<0>(t_parsed_message_id) ) == 0 )
270  {
271  // this path: first chunk for this message
272  LDEBUG( dlog, "This is the first chunk for this message; creating new message pack" );
273  // create the new message_pack object
274  incoming_message_pack& t_pack = f_incoming_messages[std::get<0>(t_parsed_message_id)];
275  // set the f_messages vector to the expected size
276  t_pack.f_messages.resize( std::get<2>(t_parsed_message_id) );
277  // put in place the first message chunk received
278  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
279  t_pack.f_routing_key = t_envelope->RoutingKey();
280  t_pack.f_chunks_received = 1;
281 
282  if( t_pack.f_chunks_received == t_pack.f_messages.size() )
283  {
284  return process_received_reply( t_pack, std::get<0>(t_parsed_message_id) );
285  }
286  }
287  else
288  {
289  // this path: have already received chunks from this message
290  LDEBUG( dlog, "This is not the first chunk for this message; adding to message pack" );
291  incoming_message_pack& t_pack = f_incoming_messages[std::get<0>(t_parsed_message_id)];
292  if( t_pack.f_processing.load() )
293  {
294  LWARN( dlog, "Message <" << std::get<0>(t_parsed_message_id) << "> is already being processed\n" <<
295  "Just received chunk " << std::get<1>(t_parsed_message_id) << " of " << std::get<2>(t_parsed_message_id) );
296  }
297  else
298  {
299  if( t_pack.f_messages[std::get<1>(t_parsed_message_id)] )
300  {
301  LWARN( dlog, "Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) << ">; chunk " << std::get<1>(t_parsed_message_id) );
302  }
303  else
304  {
305  // add chunk to set of chunks
306  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
307  ++t_pack.f_chunks_received;
308  if( t_pack.f_chunks_received == t_pack.f_messages.size() )
309  {
310  return process_received_reply( t_pack, std::get<0>(t_parsed_message_id) );
311  }
312  }
313  }
314  }
315 
316  }
317  catch( dripline_error& e )
318  {
319  LERROR( dlog, "There was a problem processing the message: " << e.what() );
320  return reply_ptr_t();
321  }
322  } // end while( ! is_canceled() )
323 
324  LDEBUG( dlog, "Receiver was canceled" );
325  return reply_ptr_t();
326  }
327 
328  reply_ptr_t receiver::process_received_reply( incoming_message_pack& a_pack, const std::string& a_message_id )
329  {
330  a_pack.f_processing.store( true );
331  try
332  {
333  message_ptr_t t_message = message::process_message( a_pack.f_messages, a_pack.f_routing_key );
334 
335  f_incoming_messages.erase( a_message_id );
336 
337  if( t_message->is_reply() )
338  {
339  return std::static_pointer_cast< msg_reply >( t_message );
340  }
341  else
342  {
343  throw dripline_error() << "Non-reply message received";
344  }
345  }
346  catch( dripline_error& e )
347  {
348  LERROR( dlog, "Dripline exception caught while handling message: " << e.what() );
349  }
350  catch( amqp_exception& e )
351  {
352  LERROR( dlog, "AMQP exception caught while sending reply: (" << e.reply_code() << ") " << e.reply_text() );
353  }
354  catch( amqp_lib_exception& e )
355  {
356  LERROR( dlog, "AMQP Library Exception caught while sending reply: (" << e.ErrorCode() << ") " << e.what() );
357  }
358  catch( std::exception& e )
359  {
360  LERROR( dlog, "Standard exception caught while sending reply: " << e.what() );
361  }
362 
363  return reply_ptr_t();
364 
365  }
366 
368  receiver(),
369  f_message_queue()
370  {}
371 
373  receiver( std::move(a_orig) ),
374  f_message_queue()
375  {}
376 
378  {}
379 
381  {
382  receiver::operator=( std::move(a_orig) );
383  // nothing to do with message queue
384  return *this;
385  }
386 
388  {
389  f_message_queue.push( a_message );
390  return;
391  }
392 
394  {
395  try
396  {
397  while( ! is_canceled() )
398  {
399  message_ptr_t t_message;
400  if( f_message_queue.timed_wait_and_pop( t_message ) )
401  {
402  this->submit_message( t_message );
403  }
404  }
405  }
406  catch( const std::exception& e )
407  {
408  // shutdown gracefully on an exception
409  LERROR( dlog, "Exception caught; shutting down.\n" << "\t" << e.what() );
410  scarab::signal_handler::cancel_all( 1 );
411  }
412  }
413 
414 
415 
416 } /* namespace dripline */
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
Definition: amqp.hh:26
virtual void process_message(message_ptr_t a_message)
Deposits the message in the concurrent queue (called by the listener)
Definition: receiver.cc:387
virtual void submit_message(message_ptr_t a_message)=0
Receives and processes messages concurrently.
Definition: receiver.hh:153
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
STL namespace.
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
Definition: core.cc:498
std::condition_variable f_conv
Definition: receiver.hh:38
concurrent_receiver & operator=(const concurrent_receiver &)=delete
Dripline-specific errors.
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Definition: receiver.cc:73
Definition: core.hh:17
reply_ptr_t process_received_reply(incoming_message_pack &a_pack, const std::string &a_message_id)
Definition: receiver.cc:328
receiver & operator=(const receiver &a_orig)=delete
Stores the basic information about a set of message chunks that will eventually make a Dripline messa...
Definition: receiver.hh:31
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
Definition: receiver.hh:76
std::atomic< bool > f_processing
Definition: receiver.hh:39
void process_message_pack(incoming_message_pack &a_pack, const std::string &a_message_id)
Converts a message pack into a Dripline message, and then submits the message for processing...
Definition: receiver.cc:184
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
Definition: receiver.cc:217
virtual ~receiver()
Definition: receiver.cc:61
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
Definition: message.cc:131
static scarab::logger dlog("agent")
void wait_for_message(incoming_message_pack &a_pack, const std::string &a_message_id)
Definition: receiver.cc:149
AmqpClient::AmqpException amqp_exception
Definition: amqp.hh:28
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks]. ...
Definition: message.cc:117
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
void execute()
Handles messages that appear in the concurrent queue by calling submit_message(). ...
Definition: receiver.cc:393
AmqpClient::AmqpLibraryException amqp_lib_exception
Definition: amqp.hh:29
static scarab::logger dlog("receiver")
virtual void process_message(message_ptr_t a_message)
Definition: receiver.cc:212
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
Reply message class.
Definition: message.hh:229
amqp_split_message_ptrs f_messages
Definition: receiver.hh:33