COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
Cosmos::Support::CommandQueue Class Reference

Class to manage information about a queue of Events. More...

#include <command_queue.h>

Collaboration diagram for Cosmos::Support::CommandQueue:

Public Member Functions

 ~CommandQueue ()
 Ensure all threads are joined before destruction. More...
 
size_t join_event_threads ()
 Join all threads spawn and empty our vector. More...
 
size_t get_event_size ()
 Retrieve the size of the queue. More...
 
Eventget_event (int i)
 Retrieve an Event by its position in the queue. More...
 
size_t get_command_size ()
 Retrieve the size of the queue. More...
 
Eventget_command (int i)
 Retrieve an Event by its position in the queue. More...
 
int32_t load_commands (string incoming_dir)
 Load queue of Events from a file. More...
 
int32_t save_commands (string temp_dir, string name=".queue")
 Save the queue of Events to a file. More...
 
int32_t restore_commands (string temp_dir, string name=".queue")
 Restore the queue of Events from a file. More...
 
int32_t run_command (Event &cmd, string nodename, double logdate_exec)
 Run the given Event. More...
 
int32_t run_commands (Agent *agent, string nodename, double logdate_exec)
 Traverse the entire queue of Events, clearing those that have finished. More...
 
int32_t add_command (Event &c)
 Remove all matching Event from the queue. More...
 
int32_t del_command (Event &c)
 Remove all matching Event from the queue. More...
 
int32_t del_command (int pos)
 Remove Event from the queue based on position. More...
 
void sort ()
 Sort the Events in the queue by Event exectution time. More...
 

Private Attributes

std::list< Eventcommands
 
std::deque< Eventevents
 
vector< std::thread > event_threads
 
bool queue_changed = false
 
bool queue_blocked = false
 

Friends

std::ostream & operator<< (std::ostream &out, CommandQueue &cmdq)
 Extraction operator. More...
 

Detailed Description

Class to manage information about a queue of Events.

Constructor & Destructor Documentation

Cosmos::Support::CommandQueue::~CommandQueue ( )

Ensure all threads are joined before destruction.

111 { join_event_threads(); }
size_t join_event_threads()
Join all threads spawn and empty our vector.
Definition: command_queue.cpp:115

Member Function Documentation

size_t Cosmos::Support::CommandQueue::join_event_threads ( )

Join all threads spawn and empty our vector.

116  {
117 // static auto join_event = [] (std::thread &t) {
118 // t.join();
119 // };
120 
121 // std::for_each(event_threads.begin(), event_threads.end(), join_event);
122 // event_threads.clear();
123  size_t count = 0;
124  for(uint16_t i=0; i<event_threads.size(); ++i)
125  {
126  if (event_threads[i].joinable())
127  {
128  event_threads[i].join();
129  ++count;
130  }
131  }
132 
133  if (count == event_threads.size())
134  {
135  event_threads.clear();
136  return count;
137  }
138  else
139  {
140  return event_threads.size();
141  }
142  }
int i
Definition: rw_test.cpp:37
int count
Definition: rw_test.cpp:36
vector< std::thread > event_threads
Definition: command_queue.h:65
size_t Cosmos::Support::CommandQueue::get_event_size ( )
inline

Retrieve the size of the queue.

Returns
The size of the queue
82 { return events.size(); }
std::deque< Event > events
Definition: command_queue.h:63
Event& Cosmos::Support::CommandQueue::get_event ( int  i)
inline

Retrieve an Event by its position in the queue.

Parameters
iInteger representing the position in the queue
Returns
Reference to the ith Event
90  {
91  std::deque<Event>::iterator ii = events.begin();
92  std::advance(ii,i);
93  return *ii;
94  }
int i
Definition: rw_test.cpp:37
std::deque< Event > events
Definition: command_queue.h:63
size_t Cosmos::Support::CommandQueue::get_command_size ( )
inline

Retrieve the size of the queue.

Returns
The size of the queue
100 { return commands.size(); }
std::list< Event > commands
Definition: command_queue.h:61
Event& Cosmos::Support::CommandQueue::get_command ( int  i)
inline

Retrieve an Event by its position in the queue.

Parameters
iInteger representing the position in the queue
Returns
Reference to the ith Event
108  {
109  std::list<Event>::iterator ii = commands.begin();
110  std::advance(ii,i);
111  return *ii;
112  }
int i
Definition: rw_test.cpp:37
std::list< Event > commands
Definition: command_queue.h:61
int32_t Cosmos::Support::CommandQueue::load_commands ( string  incoming_dir)

Load queue of Events from a file.

Loads new commands from *.command files located in the incoming directory.

Reads new Events from *.command files in the incoming directory, adds them to the queue of Events, and deletes the *.command files. Events in the queue are then sorted by their execution time.

Parameters
incoming_dirDirectory where the .queue file will be read from

Commands are loaded into the global CommandQueue object (cmd_queue), .command files are removed, and the command list is sorted by utc.

Parameters
incoming_dirDirectory where the .command files will be read from
421  {
422  DIR *dir = nullptr;
423  struct dirent *dir_entry = nullptr;
424 
425  // open the incoming directory
426  if ((dir = opendir((char *)incoming_dir.c_str())) == nullptr)
427  {
428  return GENERAL_ERROR_OPEN;
429  }
430 
431  // cycle through all the file names in the incoming directory
432  while((dir_entry = readdir(dir)) != nullptr)
433  {
434  string filename = dir_entry->d_name;
435 
436  if (filename.find(".command") != string::npos)
437  {
438 
439  string infilepath = incoming_dir + filename;
440  std::ifstream infile(infilepath.c_str());
441  if(!infile.is_open())
442  {
443 // std::cout<<"unable to read file <"<<infilepath<<">"<<std::endl;
444  continue;
445  }
446 
447  //file is open for reading commands
448  string line;
449  Event cmd;
450 
451  while(getline(infile,line))
452  {
453  cmd.set_command(line);
454 
455  if(cmd.is_command())
456  {
457 // std::cout << "Command added: " << cmd;
458  add_command(cmd);
459  }
460 // else
461 // std::cout<<"Not a command!"<<std::endl;
462  }
463  infile.close();
464 
465  //remove the .command file from incoming directory
466  if(remove(infilepath.c_str())) {
467 // std::cout<<"unable to delete file <"<<filename<<">"<<std::endl;
468  continue;
469  }
470 
471 // std::cout<<"\nThe size of the command queue is: "<< get_command_size()<<std::endl;
472  }
473  }
474 
475  sort();
476 
477  closedir(dir);
478 
479  return 0;
480  }
static string incoming_dir
Definition: agent_exec-2-0.cpp:143
Class to manage Event information.
Definition: event.h:57
void sort()
Sort the Events in the queue by Event exectution time.
Definition: command_queue.h:210
int closedir(DIR *dir)
Definition: dirent.c:74
int32_t add_command(Event &c)
Remove all matching Event from the queue.
Definition: command_queue.cpp:522
Definition: dirent.c:24
Definition: dirent.h:21
#define GENERAL_ERROR_OPEN
Definition: cosmos-errno.h:283
DIR * opendir(const char *name)
Definition: dirent.c:32
struct dirent * readdir(DIR *dir)
Definition: dirent.c:97
void set_command(string jstring)
Sets Event information from a JSON formatted string.
Definition: event.cpp:118
char * d_name
Definition: dirent.h:23
bool is_command()
Determines if the Event is a command.
Definition: event.h:161
int32_t Cosmos::Support::CommandQueue::save_commands ( string  temp_dir,
string  name = ".queue" 
)

Save the queue of Events to a file.

Save the queue of Events to the file temp_dir/.queue

\param  temp_dir    Directory where the .queue file will be written

Commands are taken from the global command queue Command queue is sorted by utc after loading

Parameters
temp_dirDirectory where the .queue file will be written
nameFile where the .queue will be written
354  {
355  if (!queue_changed)
356  {
357  return 0;
358  }
359  queue_changed = false;
360 
361  // Save previous queue
362  rename((temp_dir+name).c_str(), (temp_dir+name+'.'+(utc2unixdate(currentmjd()))).c_str());
363 
364  // Open the outgoing file
365  FILE *fd = fopen((temp_dir+name).c_str(), "w");
366  if (fd != nullptr)
367  {
368  for (Event cmd: commands)
369  {
370  fprintf(fd, "%s\n", cmd.get_event_string().c_str());
371  }
372  fclose(fd);
373  }
374  return static_cast<int32_t>(commands.size());
375  }
Class to manage Event information.
Definition: event.h:57
int fd
Definition: arduino_lib.cpp:61
static string temp_dir
Definition: agent_exec-2-0.cpp:146
string utc2unixdate(double utc)
Time for setting unix date.
Definition: timelib.cpp:1260
string name
Definition: cubesat2obj.cpp:6
bool queue_changed
Definition: command_queue.h:67
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
std::list< Event > commands
Definition: command_queue.h:61
int32_t Cosmos::Support::CommandQueue::restore_commands ( string  temp_dir,
string  name = ".queue" 
)

Restore the queue of Events from a file.

Save the queue of Events to the file temp_dir/.queue

\param  temp_dir    Directory where the .queue file will be written

Commands are taken from the global command queue Command queue is sorted by utc after loading

Parameters
temp_dirDirectory where the .queue file will be read from
nameFile where the .queue will be read from
386  {
387  queue_changed = false;
388 
389  // Reload existing queue
390  string infilepath = temp_dir + name;
391  std::ifstream infile(infilepath.c_str());
392  if(infile.is_open())
393  {
394  //file is open for reading commands
395  string line;
396  Event cmd;
397 
398  while(std::getline(infile,line))
399  {
400  //cmd.set_command(line, agent);
401  cmd.set_command(line);
402 
403  if(cmd.is_command())
404  {
405  add_command(cmd);
406  }
407  }
408  infile.close();
409  }
410  return static_cast<int32_t>(commands.size());
411  }
Class to manage Event information.
Definition: event.h:57
int32_t add_command(Event &c)
Remove all matching Event from the queue.
Definition: command_queue.cpp:522
static string temp_dir
Definition: agent_exec-2-0.cpp:146
void set_command(string jstring)
Sets Event information from a JSON formatted string.
Definition: event.cpp:118
bool is_command()
Determines if the Event is a command.
Definition: event.h:161
string name
Definition: cubesat2obj.cpp:6
bool queue_changed
Definition: command_queue.h:67
std::list< Event > commands
Definition: command_queue.h:61
int32_t Cosmos::Support::CommandQueue::run_command ( Event cmd,
string  node_name,
double  logdate_exec 
)

Run the given Event.

Execute an event using ford(). For each event run, the time of execution (utcexec) is set, the flag EVENT_FLAG_ACTUAL is set to true, and this updated command information is logged to the OUTPUT directory.

\param  cmd Reference to event to run
\param  nodename    Name of node
\param  logdate_exec    Time of execution (for logging purposes)

Executes a command in a separate shell (system) using threads. For each command run, the time of execution (utcexec) is set, the flag EVENT_FLAG_ACTUAL is set to true, and this updated command information is logged to the OUTPUT directory.

Parameters
cmdReference to event to run
nodenameName of node
logdate_execTime of execution (for logging purposes)
156  {
157  queue_changed = true;
158 
159  // set time executed & actual flag
160  cmd.set_utcexec();
161  cmd.set_actual();
162 
163  string outpath = data_type_path(node_name, "temp", "exec", logdate_exec, "out");
164  char command_line[100];
165  strcpy(command_line, cmd.get_data().c_str());
166 
167  // We keep track of all threads spawned to join before moving log files.
168  event_threads.push_back(std::thread([=] () {
169  int devn, prev_stdin, prev_stdout, prev_stderr;
170  if (outpath.empty()) {
171  devn = open("/dev/null", O_RDWR);
172  }
173  else {
174  devn = open(outpath.c_str(), O_CREAT|O_WRONLY|O_APPEND, 00666);
175  }
176 
177  prev_stdin = dup(STDIN_FILENO);
178  prev_stdout = dup(STDOUT_FILENO);
179  prev_stderr = dup(STDERR_FILENO);
180 
181  // Redirect all output our executed command.
182  dup2(devn, STDIN_FILENO);
183  dup2(devn, STDOUT_FILENO);
184  dup2(devn, STDERR_FILENO);
185  close(devn);
186 
187  // Execute the command.
188  system(command_line);
189 
190  dup2(prev_stdin, STDIN_FILENO);
191  dup2(prev_stdout, STDOUT_FILENO);
192  dup2(prev_stderr, STDERR_FILENO);
193  close(prev_stdin);
194  close(prev_stdout);
195  close(prev_stderr);
196  }));
197 
198  //#if defined(COSMOS_WIN_OS)
199  // char command_line[100];
200  // strcpy(command_line, cmd.get_data().c_str());
201  //
202  // STARTUPINFOA si;
203  // PROCESS_INFORMATION pi;
204  //
205  // ZeroMemory( &si, sizeof(si) );
206  // si.cb = sizeof(si);
207  // ZeroMemory( &pi, sizeof(pi) );
208  //
209  // if (CreateProcessA(NULL, (LPSTR) command_line, NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi))
210  // {
211  // // int32_t pid = pi.dwProcessId;
212  // CloseHandle( pi.hProcess );
213  // CloseHandle( pi.hThread );
214  // }
215  //#else
216  // int32_t pid = fork(); // Fork paradigm copies ENTIRE process space, leads to errors when exiting child. Now using threads.
217  //
218  // char *words[MAXCOMMANDWORD];
219  // string_parse((char *)cmd.get_data().c_str(), words, MAXCOMMANDWORD);
220  // string outpath = data_type_path(node_name, "temp", "exec", logdate_exec, "out");
221  // if (pid != 0) {
222  // signal(SIGCHLD, SIG_IGN); // Ensure no zombies.
223  // }
224  // else {
225  // int devn;
226  // if (outpath.empty()) {
227  // devn = open("/dev/null",O_RDWR);
228  // }
229  // else {
230  // devn = open(outpath.c_str(), O_CREAT|O_WRONLY|O_APPEND, 00666);
231  // }
232  // dup2(devn, STDIN_FILENO);
233  // dup2(devn, STDOUT_FILENO);
234  // dup2(devn, STDERR_FILENO);
235  // close(devn);
236  //
237  // // Execute the command.
238  // execvp(words[0], &(words[1]));
239  // fflush(stdout);
240  // exit (0);
241  // }
242  //
243  //#endif
244 
245  // log to event file
246  log_write(node_name, "exec", logdate_exec, "event", cmd.get_event_string().c_str());
247  return 0;
248  }
static double logdate_exec
Definition: agent_exec-2-0.cpp:77
string node_name
Definition: agent_001.cpp:46
vector< std::thread > event_threads
Definition: command_queue.h:65
string data_type_path(string node, string location, string agent, double mjd, string type)
Create data file path.
Definition: datalib.cpp:910
string get_event_string()
Retrieves Event information.
Definition: event.cpp:138
bool queue_changed
Definition: command_queue.h:67
void set_utcexec()
Sets Event::utcexec to current time.
Definition: event.h:101
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
string get_data()
Retrieves Event::data.
Definition: event.h:143
void set_actual()
Sets Event::flag to indicate the event has actually executed (i.e. EVENT_FLAG_ACTUAL) ...
Definition: event.h:107
int32_t Cosmos::Support::CommandQueue::run_commands ( Agent agent,
string  node_name,
double  logdate_exec 
)

Traverse the entire queue of Events, clearing those that have finished.

Traverse the entire queue of Events, and run those which qualify.

An Event qualifies to be cleared if its thread could be joined, or if it has been running for more than 1 minute.

\param  agent   Pointer to Agent object (for call to condition_true(..))
\param  nodename    Name of the node
\param  logdate_exec    Time of execution (for logging purposes)

Traverse the entire queue of Events, and run those which qualify.

An Event only qualifies to run if the current time is greater than or equal to the execution time of the Event. Further, if the Event is conditional, then the Event condition must be true.

\param  agent   Pointer to Agent object (for call to condition_true(..))
\param  nodename    Name of the node
\param  logdate_exec    Time of execution (for logging purposes)

An Event only qualifies to run if the current time is greater than or equal to the execution time of the Event. Further, if the Event is conditional, then the Event condition must be true.

\param  agent   Pointer to Agent object (for call to condition_true(..))
\param  nodename    Name of the node
\param  logdate_exec    Time of execution (for logging purposes)
262  {
263  for(std::list<Event>::iterator ii = commands.begin(); ii != commands.end(); ++ii)
264  {
265  // if command is not solo, or event_threads queue is empty
266  if (event_threads.empty() || (!ii->is_solo() && !queue_blocked))
267  {
268  if (ii->is_solo())
269  {
270  queue_blocked = true;
271  }
272  else
273  {
274  queue_blocked = false;
275  }
276  // if command is ready
277  if (ii->is_ready())
278  {
279  // if command is conditional
280  if (ii->is_conditional())
281  {
282  // if command condition is true
283  if(ii->condition_true(agent->cinfo))
284  {
285  // if command is repeatable
286  if(ii->is_repeat())
287  {
288  // if command has not already run
289 // if(!ii->already_ran)
290  if (!ii->is_alreadyrun())
291  {
292  strncpy(agent->cinfo->node.lastevent, ii->name.c_str(), COSMOS_MAX_NAME);
293  agent->cinfo->node.lasteventutc = currentmjd();
295  ii->set_alreadyrun(true);
296  events.push_back(*ii);
297  if (events.size() > 10)
298  {
299  events.pop_front();
300  }
301 // ii->already_ran = true;
302  break;
303  }
304  }
305  // else command is non-repeatable
306  else
307  {
309  events.push_back(*ii);
310  if (events.size() > 10)
311  {
312  events.pop_front();
313  }
314  commands.erase(ii--);
315  break;
316  }
317  }
318  // else command condition is false
319  else
320  {
321  ii->set_alreadyrun(false);
322  }
323  }
324  // else command is non-conditional
325  else
326  {
328  events.push_back(*ii);
329  if (events.size() > 10)
330  {
331  events.pop_front();
332  }
333  commands.erase(ii--);
334  }
335  }
336  }
337  else
338  {
339  ;//cout<<"This command is *NOT* ready to run! ";
340  }
341  }
342  return static_cast<int32_t>(commands.size());
343  }
static double logdate_exec
Definition: agent_exec-2-0.cpp:77
int32_t run_command(Event &cmd, string nodename, double logdate_exec)
Run the given Event.
Definition: command_queue.cpp:155
#define COSMOS_MAX_NAME
Largest JSON name.
Definition: cosmos-defs.h:55
char lastevent[40+1]
Last event.
Definition: jsondef.h:3558
string node_name
Definition: agent_001.cpp:46
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
vector< std::thread > event_threads
Definition: command_queue.h:65
double lasteventutc
Last event UTC.
Definition: jsondef.h:3560
std::deque< Event > events
Definition: command_queue.h:63
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
bool queue_blocked
Definition: command_queue.h:69
cosmosstruc * cinfo
Definition: agentclass.h:346
std::list< Event > commands
Definition: command_queue.h:61
int32_t Cosmos::Support::CommandQueue::add_command ( Event c)

Remove all matching Event from the queue.

Parameters
cEvent to remove
Returns
The number of Events removed

JIMNOTE: this only adds given Event to the queue if the Event has flag for EVENT_TYPE_COMMAND set to true

523  {
524  // Replace if it matches an existing command, otherwise add to queue
525  for (std::list<Event>::iterator ii = commands.begin(); ii != commands.end(); ++ii) {
526  if (c.get_name() == ii->get_name()) {
527  *ii = c;
528  queue_changed = true;
529  return 0;
530  }
531  }
532 
533  commands.push_back(c);
534  queue_changed = true;
535  return 1;
536  }
bool queue_changed
Definition: command_queue.h:67
std::list< Event > commands
Definition: command_queue.h:61
string get_name()
Retrieves Event::name.
Definition: event.h:113
int32_t Cosmos::Support::CommandQueue::del_command ( Event c)

Remove all matching Event from the queue.

Parameters
cEvent to remove
Returns
The number of Events removed

This function only removes events from the queue if the are exactly equal to the given Event.

489  {
490  size_t prev_sz = commands.size();
491  for (std::list<Event>::iterator ii = commands.begin();
492  ii != commands.end();
493  ++ii) {
494  if (c == *ii) {
495  commands.erase(ii--);
496  }
497  }
498 
499  queue_changed = true;
500  return static_cast<int32_t>(prev_sz - commands.size());
501 
502  }
bool queue_changed
Definition: command_queue.h:67
std::list< Event > commands
Definition: command_queue.h:61
int32_t Cosmos::Support::CommandQueue::del_command ( int  pos)

Remove Event from the queue based on position.

Parameters
posPosition of event to remove
Returns
The number of Events removed

This function removes events based on their queue position (0-indexed).

512  {
513  size_t prev_sz = commands.size();
514  std::list<Event>::iterator b = commands.begin();
515 
516  std::advance(b, pos);
517  commands.erase(b);
518  queue_changed = true;
519  return static_cast<int32_t>(prev_sz - commands.size());
520  }
long b
Definition: jpegint.h:371
bool queue_changed
Definition: command_queue.h:67
std::list< Event > commands
Definition: command_queue.h:61
void Cosmos::Support::CommandQueue::sort ( )
inline

Sort the Events in the queue by Event exectution time.

This function is called after new Events are loaded.

210 { commands.sort([](Event & c1, Event & c2) { return c1.getTime() < c2.getTime(); }); }
Class to manage Event information.
Definition: event.h:57
string getTime()
Retrieves Event::mjd.
Definition: event.h:131
std::list< Event > commands
Definition: command_queue.h:61

Friends And Related Function Documentation

std::ostream& operator<< ( std::ostream &  out,
CommandQueue cmdq 
)
friend

Extraction operator.

Parameters
outReference to ostream
cmdqReference to CommandQueue (JIMNOTE: should be const, ya?)
Returns
Reference to modified ostream

Writes the given CommandQueue to the given output stream (in JSON format) and returns a reference to the modified ostream.

548  {
549  for(std::list<Event>::iterator ii = cmdq.commands.begin(); ii != cmdq.commands.end(); ++ii)
550  out << *ii << std::endl;
551  return out;
552  }
std::list< Event > commands
Definition: command_queue.h:61

Member Data Documentation

std::list<Event> Cosmos::Support::CommandQueue::commands
private

An std::list of members of the Event class to be run

std::deque<Event> Cosmos::Support::CommandQueue::events
private

An std::queue of members of the Event class that have run

vector<std::thread> Cosmos::Support::CommandQueue::event_threads
private

A vector of all threads spawned to run events

bool Cosmos::Support::CommandQueue::queue_changed = false
private

A boolean indicator that the queue has changed

bool Cosmos::Support::CommandQueue::queue_blocked = false
private

The documentation for this class was generated from the following files: