Dripline-Cpp  v2.4.2
Dripline Implementation in C++
listener.cc
Go to the documentation of this file.
1 /*
2  * listener.cc
3  *
4  * Created on: Jun 23, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "listener.hh"
11 
12 #include "endpoint.hh"
13 
14 #include "core.hh"
15 #include "dripline_exceptions.hh"
16 
17 #include "logger.hh"
18 
19 namespace dripline
20 {
21  LOGGER( dlog, "listener" );
22 
24  cancelable(),
25  f_channel(),
26  f_consumer_tag(),
27  f_listen_timeout_ms( 1000 ),
28  f_listener_thread()
29  {}
30 
32  cancelable( std::move(a_orig) ),
33  f_channel( std::move(a_orig.f_channel) ),
34  f_consumer_tag( std::move(a_orig.f_consumer_tag) ),
35  f_listen_timeout_ms( std::move(a_orig.f_listen_timeout_ms) ),
36  f_listener_thread( std::move(a_orig.f_listener_thread) )
37  {}
38 
40  {}
41 
43  {
44  cancelable::operator=( std::move(a_orig) );
45  f_channel = std::move(a_orig.f_channel);
46  f_consumer_tag = std::move(a_orig.f_consumer_tag);
47  f_consumer_tag = std::move(a_orig.f_listen_timeout_ms);
48  f_listener_thread = std::move(a_orig.f_listener_thread);
49  return *this;
50  }
51 
53  scarab::cancelable(),
55  f_endpoint( a_endpoint_ptr )
56  {}
57 
59  scarab::cancelable( std::move(a_orig) ),
60  listener_receiver( std::move(a_orig) ),
61  f_endpoint( std::move(a_orig.f_endpoint) )
62  {}
63 
65  {}
66 
68  {
69  listener_receiver::operator=( std::move(a_orig) );
70  f_endpoint = std::move(a_orig.f_endpoint);
71  return *this;
72  }
73 
75  {
76  LINFO( dlog, "Listening for incoming messages on <" << f_endpoint->name() << ">" );
77 
78  while( ! f_canceled.load() )
79  {
80 
81  amqp_envelope_ptr t_envelope;
82  bool t_channel_valid = core::listen_for_message( t_envelope, f_channel, f_consumer_tag, f_listen_timeout_ms );
83 
84  if( f_canceled.load() )
85  {
86  LDEBUG( dlog, "Service canceled" );
87  return true;
88  }
89 
90  if( ! t_envelope && t_channel_valid )
91  {
92  // we end up here every time the listen times out with no message received
93  continue;
94  }
95 
96  handle_message_chunk( t_envelope );
97 
98  if( ! t_channel_valid )
99  {
100  LERROR( dlog, "Channel is no longer valid for <" << f_endpoint->name() << ">" );
101  return false;
102  }
103 
104  if( f_canceled.load() )
105  {
106  LDEBUG( dlog, "Listener <" << f_endpoint->name() << "> canceled" );
107  return true;
108  }
109  }
110  return true;
111  }
112 
114  {
115  try
116  {
117  f_endpoint->sort_message( a_message );
118  return;
119  }
120  catch( dripline_error& e )
121  {
122  LERROR( dlog, "<" << f_endpoint->name() << ">: Dripline exception caught while handling message: " << e.what() );
123  throw;
124  }
125  catch( amqp_exception& e )
126  {
127  LERROR( dlog, "<" << f_endpoint->name() << ">: AMQP exception caught while handling message: (" << e.reply_code() << ") " << e.reply_text() );
128  throw;
129  }
130  catch( amqp_lib_exception& e )
131  {
132  LERROR( dlog, "<" << f_endpoint->name() << ">: AMQP Library Exception caught while handling message: (" << e.ErrorCode() << ") " << e.what() );
133  throw;
134  }
135  catch( std::exception& e )
136  {
137  LERROR( dlog, "<" << f_endpoint->name() << ">: Standard exception caught while handling message: " << e.what() );
138  throw;
139  }
140 
141  return;
142  }
143 
144 } /* namespace dripline */
listener_receiver & operator=(const listener_receiver &)=delete
STL namespace.
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
Definition: core.cc:498
Dripline-specific errors.
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Definition: receiver.cc:73
Definition: core.hh:17
Convenience class to bring together listener and concurrent_receiver.
Definition: listener.hh:75
Decorator class for a plain endpoint: adds listener_receiver capabilities.
Definition: listener.hh:104
A listener is a class capable of listening for AMQP messages on an AMQP channel. This class provides ...
Definition: listener.hh:47
virtual ~listener()
Definition: listener.cc:39
std::shared_ptr< endpoint > endpoint_ptr_t
Definition: dripline_fwd.hh:39
virtual void submit_message(message_ptr_t a_message)
Direct submission of messages to the endpoint.
Definition: listener.cc:113
static scarab::logger dlog("agent")
AmqpClient::AmqpException amqp_exception
Definition: amqp.hh:28
virtual bool listen_on_queue()
Listens for AMQP messages and then passes them to be handled as Dripline message chunks.
Definition: listener.cc:74
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
endpoint_listener_receiver & operator=(const endpoint_listener_receiver &)=delete
listener & operator=(const listener &)=delete
AmqpClient::AmqpLibraryException amqp_lib_exception
Definition: amqp.hh:29
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
endpoint_listener_receiver(endpoint_ptr_t a_endpoint_ptr)
Definition: listener.cc:52