Dripline-Cpp  v2.4.2
Dripline Implementation in C++
receiver.hh
Go to the documentation of this file.
1 /*
2  * receiver.hh
3  *
4  * Created on: Feb 18, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #ifndef DRIPLINE_RECEIVER_HH_
9 #define DRIPLINE_RECEIVER_HH_
10 
11 #include "dripline_api.hh"
12 #include "dripline_fwd.hh"
13 
14 #include "cancelable.hh"
15 #include "concurrent_queue.hh"
16 #include "member_variables.hh"
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <map>
21 #include <thread>
22 
23 namespace dripline
24 {
25 
32  {
35  std::string f_routing_key;
36  std::thread f_thread;
37  std::mutex f_mutex;
38  std::condition_variable f_conv;
39  std::atomic< bool > f_processing;
43  };
44  typedef std::map< std::string, incoming_message_pack > incoming_message_map;
45 
46 
47  // contains mechanisms for receiving messages synchronously
76  class DRIPLINE_API receiver : public virtual scarab::cancelable
77  {
78  public:
79  receiver();
80  receiver( const receiver& a_orig ) = delete;
81  receiver( receiver&& a_orig );
82  virtual ~receiver();
83 
84  receiver& operator=( const receiver& a_orig ) = delete;
85  receiver& operator=( receiver&& a_orig );
86 
87  public:
91  void handle_message_chunk( amqp_envelope_ptr a_envelope );
92 
95  void wait_for_message( incoming_message_pack& a_pack, const std::string& a_message_id );
97  void process_message_pack( incoming_message_pack& a_pack, const std::string& a_message_id );
98 
101  virtual void process_message( message_ptr_t a_message );
102 
104  mv_referrable( incoming_message_map, incoming_messages );
106  mv_accessible( unsigned, single_message_wait_ms );
107 
108  public:
116  reply_ptr_t wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms = 0 );
125  reply_ptr_t wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, bool& a_chan_valid, int a_timeout_ms = 0 );
126 
127  protected:
128  reply_ptr_t process_received_reply( incoming_message_pack& a_pack, const std::string& a_message_id );
129 
130  };
131 
154  {
155  public:
157  concurrent_receiver( const concurrent_receiver& ) = delete;
159  virtual ~concurrent_receiver();
160 
161  concurrent_receiver& operator=( const concurrent_receiver& ) = delete;
162  concurrent_receiver& operator=( concurrent_receiver&& a_orig );
163 
164  public:
166  virtual void process_message( message_ptr_t a_message );
167 
169  void execute();
170 
171  protected:
174  virtual void submit_message( message_ptr_t a_message ) = 0;
175 
176  mv_referrable( scarab::concurrent_queue< message_ptr_t >, message_queue );
177  mv_referrable( std::thread, receiver_thread );
178  };
179 
180 } /* namespace dripline */
181 
182 #endif /* DRIPLINE_RECEIVER_HH_ */
Receives and processes messages concurrently.
Definition: receiver.hh:153
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
std::condition_variable f_conv
Definition: receiver.hh:38
std::map< std::string, incoming_message_pack > incoming_message_map
Definition: receiver.hh:44
#define DRIPLINE_API
Definition: dripline_api.hh:34
std::vector< amqp_message_ptr > amqp_split_message_ptrs
Definition: amqp.hh:31
Stores the basic information about a set of message chunks that will eventually make a Dripline messa...
Definition: receiver.hh:31
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
Definition: receiver.hh:76
std::atomic< bool > f_processing
Definition: receiver.hh:39
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
amqp_split_message_ptrs f_messages
Definition: receiver.hh:33