Dripline-Cpp  v2.4.2
Dripline Implementation in C++
relayer.cc
Go to the documentation of this file.
1 #define DRIPLINE_API_EXPORTS
2 
3 #include "relayer.hh"
4 
5 #include "dripline_exceptions.hh"
6 
7 #include "logger.hh"
8 #include "param.hh"
9 
10 #include <chrono>
11 
12 namespace dripline
13 {
14  LOGGER( dlog, "relayer" );
15 
16  relayer::relayer( const scarab::param_node& a_config, const std::string& a_broker_address, unsigned a_port, const std::string& a_auth_file ) :
17  core( a_config, a_broker_address, a_port, a_auth_file ),
18  scarab::cancelable(),
19  f_queue(),
20  f_msg_receiver()
21  {
22  }
23 
25  {
26  }
27 
29  {
30  LDEBUG( dlog, "Dripline relayer starting" );
31  while( ! is_canceled() )
32  {
33  mar_ptr t_mar;
34  bool t_have_message = f_queue.timed_wait_and_pop( t_mar ); // blocking call for next message to send; timed so that cancellation can be rechecked
35  if( ! t_have_message ) continue;
36 
37  try
38  {
39  std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
40  switch( t_mar->f_message->message_type() )
41  {
42  case msg_t::request:
43  t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr = core::send( std::static_pointer_cast< dripline::msg_request >( t_mar->f_message ) );
44  break;
45  case msg_t::alert:
46  t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr = core::send( std::static_pointer_cast< dripline::msg_alert >( t_mar->f_message ) );
47  break;
48  default:
49  throw dripline_error() << "Unsupported message type: " << t_mar->f_message->message_type();
50  break;
51  }
52  if( ! t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_successful_send )
53  {
54  LERROR( dlog, "Message sending failed: " << t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_send_error_message << '\n' << *t_mar->f_message );
55  }
56  t_mar->f_wait_for_send_pkg->f_condition_var.notify_one();
57  continue;
58  }
59  catch( message_ptr_t )
60  {
61  LWARN( dlog, "Operating in offline mode; message not sent" );
62  }
63  catch( connection_error& e )
64  {
65  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() << '\n' << *t_mar->f_message );
66  }
67  catch( dripline_error& e )
68  {
69  LERROR( dlog, "Dripline error while sending reply:\n" << e.what() << '\n' << *t_mar->f_message );
70  }
71 
72  }
73 
74  LDEBUG( dlog, "Exiting the Dripline relayer" );
75 
76  return;
77  }
78 
79 
81  {
82  LDEBUG( dlog, "Canceling relayer" );
83  f_queue.interrupt();
84  return;
85  }
86 
88  {
89  if( is_canceled() )
90  {
91  LWARN( dlog, "Relayer has been canceled; request not sent" );
92  wait_for_send_pkg_ptr t_return;
93  t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
94  t_return->f_sent_msg_pkg_ptr->f_successful_send = false;
95  return t_return;
96  }
97  LDEBUG( dlog, "Sending request to <" << a_request->routing_key() << ">" );
98  mar_ptr t_mar = std::make_shared< message_and_reply >();
99  std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
100  t_mar->f_message = std::static_pointer_cast< dripline::message >( a_request );
101  t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
102  f_queue.push( t_mar );
103  return t_mar->f_wait_for_send_pkg;
104  }
105 
107  {
108  if( is_canceled() )
109  {
110  LWARN( dlog, "Relayer has been canceled; request not sent" );
111  wait_for_send_pkg_ptr t_return;
112  t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
113  t_return->f_sent_msg_pkg_ptr->f_successful_send = false;
114  return t_return;
115  }
116  LDEBUG( dlog, "Sending alert to <" << a_alert->routing_key() << ">" );
117  mar_ptr t_mar = std::make_shared< message_and_reply >();
118  std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
119  t_mar->f_message = std::static_pointer_cast< dripline::message >( a_alert );
120  t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
121  f_queue.push( t_mar );
122  return t_mar->f_wait_for_send_pkg;
123  }
124 
125  reply_ptr_t relayer::wait_for_reply( const wait_for_send_pkg_ptr a_receive_reply, int a_timeout_ms )
126  {
127  bool t_temp;
128  return wait_for_reply( a_receive_reply, t_temp, a_timeout_ms );
129  }
130 
131  reply_ptr_t relayer::wait_for_reply( const wait_for_send_pkg_ptr a_receive_reply, bool& a_chan_valid, int a_timeout_ms )
132  {
133  std::unique_lock< std::mutex > t_lock( a_receive_reply->f_mutex );
134  auto t_deadline = std::chrono::system_clock::now() + std::chrono::milliseconds( a_timeout_ms );
135  while( ! a_receive_reply->f_sent_msg_pkg_ptr )
136  {
137  std::cv_status t_status = a_receive_reply->f_condition_var.wait_until( t_lock, t_deadline );
138  if( t_status == std::cv_status::timeout )
139  {
140  // timeout
141  return reply_ptr_t();
142  }
143  }
144  return f_msg_receiver.wait_for_reply( a_receive_reply->f_sent_msg_pkg_ptr, a_chan_valid, a_timeout_ms );
145  }
146 
147 }
virtual sent_msg_pkg_ptr send(request_ptr_t a_request) const
Definition: core.cc:180
scarab::concurrent_queue< mar_ptr > f_queue
Definition: relayer.hh:104
void do_cancellation(int a_code)
Definition: relayer.cc:80
std::shared_ptr< message_and_reply > mar_ptr
Definition: relayer.hh:102
reply_ptr_t wait_for_reply(const wait_for_send_pkg_ptr a_receive_reply, int a_timeout_ms=0)
Definition: relayer.cc:125
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
Contains all of the information common to all types of Dripline messages.
Definition: message.hh:53
void execute_relayer()
Thread execution function: sends any messages that are submitted via the send functions.
Definition: relayer.cc:28
Definition: core.hh:17
relayer(const scarab::param_node &a_config=scarab::param_node(), const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="")
Definition: relayer.cc:16
static scarab::logger dlog("agent")
Error indicating a problem with the connection to the broker.
virtual ~relayer()
Definition: relayer.cc:24
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
std::shared_ptr< wait_for_send_pkg > wait_for_send_pkg_ptr
Definition: relayer.hh:67
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:72
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
wait_for_send_pkg_ptr send_async(request_ptr_t a_request) const
Definition: relayer.cc:87