Dripline-Cpp  v2.4.2
Dripline Implementation in C++
scheduler.hh
Go to the documentation of this file.
1 /*
2  * scheduler.hh
3  *
4  * Created on: Aug 13, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #ifndef DRIPLINE_SCHEDULER_HH_
9 #define DRIPLINE_SCHEDULER_HH_
10 
11 #include "cancelable.hh"
12 
13 #include "dripline_exceptions.hh"
14 
15 #include "logger.hh"
16 #include "member_variables.hh"
17 
18 #include <chrono>
19 #include <condition_variable>
20 #include <functional>
21 #include <thread>
22 #include <map>
23 #include <mutex>
24 #include <utility>
25 
26 LOGGER( dlog_sh, "scheduler" )
27 
28 namespace dripline
29 {
40  {
41  virtual ~base_executor() {}
42  virtual void operator()( std::function< void() > ) = 0;
43  };
44 
52  {
53  virtual ~simple_executor() {}
54  virtual void operator()( std::function< void() > an_executable )
55  {
56  LDEBUG( dlog_sh, "executing" );
57  an_executable();
58  return;
59  }
60  };
61 
92  template< typename executor = simple_executor, typename clock = std::chrono::system_clock >
93  class DRIPLINE_API scheduler : virtual public scarab::cancelable
94  {
95  public:
96  using clock_t = clock;
97  using time_point_t = typename clock::time_point;
98  using duration_t = typename clock::duration;
99  using executable_t = std::function< void() >;
100 
105  struct event
106  {
108  int f_id;
109  };
110  typedef std::multimap< time_point_t, event > events_map_t;
111 
112  scheduler();
113  scheduler( const scheduler& ) = delete;
114  scheduler( scheduler&& );
115  virtual ~scheduler();
116 
117  scheduler& operator=( const scheduler& ) = delete;
118  scheduler& operator=( scheduler&& );
119 
126  int schedule( executable_t an_executable, time_point_t an_exe_time );
127 
135  int schedule( executable_t an_executable, duration_t an_interval, time_point_t an_exe_time = clock::now() );
136 
138  void unschedule( int an_id );
139 
141  void execute();
142 
144  mv_accessible( duration_t, exe_buffer );
145 
147  mv_accessible( duration_t, cycle_time );
148 
150  mv_referrable_const( executor, the_executor );
151 
153  mv_referrable_const( events_map_t, events );
154 
156  mv_accessible_static( int, curr_id )
157 
158  protected:
159  void schedule_repeating( executable_t an_executable, duration_t an_interval, int an_id, time_point_t a_rep_start = clock::now() );
160 
161  std::recursive_mutex f_scheduler_mutex; // recursive_mutex is used so that the mutex can be locked twice by the same thread when using a repeating schedule
162 
163  std::mutex f_executor_mutex;
164 
165  std::condition_variable_any f_cv;
166  std::thread f_scheduler_thread;
167  };
168 
169  template< typename executor, typename clock >
171 
172  template< typename executor, typename clock >
174  f_exe_buffer( std::chrono::milliseconds(50) ),
175  f_cycle_time( std::chrono::milliseconds(500) ),
176  f_the_executor(),
177  f_events(),
178  f_scheduler_mutex(),
179  f_executor_mutex(),
180  f_cv(),
181  f_scheduler_thread()
182  {}
183 
184  template< typename executor, typename clock >
186  f_exe_buffer( std::move(a_orig.f_exe_buffer) ),
187  f_cycle_time( std::move(a_orig.f_cycle_time) ),
188  f_the_executor(),
189  f_events( std::move(a_orig.f_events) ),
190  f_scheduler_mutex(),
191  f_executor_mutex(),
192  f_cv(),
193  f_scheduler_thread( std::move(a_orig.f_scheduler_thread) )
194  {}
195 
196  template< typename executor, typename clock >
198  {}
199 
200  template< typename executor, typename clock >
202  {
203  std::unique_lock< std::recursive_mutex >t_this_lock( f_scheduler_mutex );
204  std::unique_lock< std::recursive_mutex >t_orig_lock( a_orig.f_scheduler_mutex );
205 
206  f_exe_buffer = std::move(a_orig.f_exe_buffer);
207  f_cycle_time = std::move(a_orig.f_cycle_time);
208  f_events = std::move(a_orig.f_events);
209  f_scheduler_thread = std::move(a_orig.f_scheduler_thread);
210 
211  return *this;
212  }
213 
214  template< typename executor, typename clock >
216  {
217  bool t_new_first = false;
218  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
219  if( f_events.empty() || an_exe_time < f_events.begin()->first )
220  {
221  LDEBUG( dlog_sh, "New first event" );
222  t_new_first = true;
223  }
224  LDEBUG( dlog_sh, "Inserting new event" );
225  event t_event;
226  t_event.f_executable = an_executable;
227  t_event.f_id = s_curr_id++;
228  f_events.insert( std::make_pair( an_exe_time, t_event ) );
229  if( t_new_first )
230  {
231  // wake the waiting thread
232  LDEBUG( dlog_sh, "That event was first; waking execution thread" );
233  f_cv.notify_one();
234  }
235 
236  return t_event.f_id;
237  }
238 
239  template< typename executor, typename clock >
240  int scheduler< executor, clock >::schedule( executable_t an_executable, duration_t an_interval, time_point_t an_exe_time )
241  {
242  // if the interval is too short, it's more likely that the execution time will be longer than the interval
243  if( an_interval < 2*f_exe_buffer )
244  {
245  throw dripline_error() << "Cannot schedule executions with an interval of less than " << std::chrono::duration_cast<std::chrono::seconds>(2*f_exe_buffer).count() << " seconds";
246  }
247 
248  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
249  int t_id = scheduler< executor, clock >::s_curr_id++;
250  schedule_repeating( an_executable, an_interval, t_id, an_exe_time );
251 
252  // return the id
253  return t_id;
254  }
255 
256  template< typename executor, typename clock >
257  void scheduler< executor, clock >::schedule_repeating( executable_t an_executable, duration_t an_interval, int an_id, time_point_t a_rep_start )
258  {
259  LDEBUG( dlog_sh, "Scheduling a repeating event" );
260 
261  // create the wrapper executable around the event
262  executable_t t_wrapped_executable = [this, an_executable, an_interval, an_id, a_rep_start](){
263  LDEBUG( dlog_sh, "wrapped execution" );
264  // reschedule itself an_interval in the future
265  this->schedule_repeating( an_executable, an_interval, an_id, a_rep_start + an_interval );
266  // execute the event
267  LDEBUG( dlog_sh, "executing the wrapped executable" );
268  an_executable();
269  };
270 
271  // create the event
272  event t_event;
273  t_event.f_executable = t_wrapped_executable;
274  t_event.f_id = an_id;
275 
276  // check if this'll be a new first event
277  bool t_new_first = false;
278  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
279  if( f_events.empty() || a_rep_start < f_events.begin()->first )
280  {
281  LDEBUG( dlog_sh, "New first event" );
282  t_new_first = true;
283  }
284 
285  // add the event to the map
286  f_events.insert( std::make_pair( a_rep_start, t_event ) );
287  if( t_new_first )
288  {
289  // wake the waiting thread
290  LDEBUG( dlog_sh, "That event was first; waking execution thread" );
291  f_cv.notify_one();
292  }
293 
294  return;
295  }
296 
297  template< typename executor, typename clock >
299  {
300  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
301  auto i_event = f_events.begin();
302  for( ; i_event->second.f_id != an_id && i_event != f_events.end(); ++i_event ) {
303  LDEBUG( dlog_sh, "Looking for event with id <" << an_id << ">; found one with id <" << i_event->second.f_id << ">" );
304  }
305 
306  if( i_event->second.f_id == an_id )
307  {
308  LDEBUG( dlog_sh, "Found event with id <" << i_event->second.f_id << ">; erasing it now" );
309  f_events.erase( i_event );
310  LDEBUG( dlog_sh, "Removed event <" << an_id << "> from the schedule" );
311  }
312  else
313  {
314  LDEBUG( dlog_sh, "No event with id <" << an_id << "> found" );
315  }
316 
317  return;
318  }
319 
320  template< typename executor, typename clock >
322  {
323  LDEBUG( dlog_sh, "Starting scheduler" );
324  while( ! is_canceled() )
325  {
326  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
327  if( f_events.empty() )
328  {
329  // wait for f_cycle_time
330  f_cv.wait_for( t_lock, f_cycle_time );
331  continue;
332  }
333  else
334  {
335  auto t_first_event = f_events.begin();
336  //time_point_t t_earliest = t_first_exe.first;
337  duration_t t_to_earliest = t_first_event->first - clock::now();
338  if( t_to_earliest < f_exe_buffer )
339  {
340  // do event now
341  LDEBUG( dlog_sh, "Executing first event from the map" );
342  std::unique_lock< std::mutex > t_exe_lock( f_executor_mutex );
343  f_the_executor( t_first_event->second.f_executable );
344  f_events.erase( t_first_event );
345  continue;
346  }
347  if( t_to_earliest < f_cycle_time )
348  {
349  // wait until t_first_event->first
350  f_cv.wait_until( t_lock, t_first_event->first );
351  continue;
352  }
353  // wait for f_cycle_time
354  f_cv.wait_for( t_lock, f_cycle_time );
355  continue;
356  }
357  }
358  LDEBUG( dlog_sh, "Scheduler exiting" );
359  return;
360  }
361 
362 } /* namespace dripline */
363 
364 #endif /* DRIPLINE_SCHEDULER_HH_ */
std::condition_variable_any f_cv
Definition: scheduler.hh:165
snake_case_mv_accessible_static(int, curr_id) protected std::recursive_mutex f_scheduler_mutex
The ID to be used for the next scheduled event.
Definition: scheduler.hh:156
Definition of an event, including the executable object and the scheduler ID.
Definition: scheduler.hh:105
STL namespace.
Dripline-specific errors.
typename std::chrono::system_clock ::time_point time_point_t
Definition: scheduler.hh:97
Executes scheduled events.
Definition: scheduler.hh:93
std::multimap< time_point_t, event > events_map_t
Definition: scheduler.hh:110
virtual ~base_executor()
Definition: scheduler.hh:41
std::thread f_scheduler_thread
Definition: scheduler.hh:166
#define DRIPLINE_API
Definition: dripline_api.hh:34
std::chrono::system_clock clock_t
Definition: scheduler.hh:96
std::function< void() > executable_t
Definition: scheduler.hh:99
virtual void operator()(std::function< void() > an_executable)
Definition: scheduler.hh:54
std::mutex f_executor_mutex
Definition: scheduler.hh:163
Base class for executors.
Definition: scheduler.hh:39
Given an executable function object, uses operator() to execute it.
Definition: scheduler.hh:51
static scarab::logger dlog_sh("scheduler")
typename std::chrono::system_clock ::duration duration_t
Definition: scheduler.hh:98