COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_exec.cpp File Reference

Executive Agent source file. More...

#include "support/configCosmos.h"
#include "agent/agentclass.h"
#include "support/jsonlib.h"
#include "support/convertlib.h"
#include "support/datalib.h"
#include "agent/command_queue.h"
#include <iostream>
#include <iomanip>
#include <list>
#include <fstream>
#include <sstream>
#include <mutex>
Include dependency graph for agent_exec.cpp:

Classes

struct  boot_flags
 

Functions

void move_and_compress_exec ()
 
int32_t get_last_offset ()
 
int32_t get_flags ()
 
int32_t request_get_queue_size (string &request, string &response, Agent *agent)
 
int32_t request_get_event (string &request, string &response, Agent *agent)
 
int32_t request_get_command (string &request, string &response, Agent *agent)
 
int32_t request_del_command (string &request, string &response, Agent *agent)
 
int32_t request_del_command_id (string &request, string &response, Agent *agent)
 
int32_t request_add_command (string &request, string &response, Agent *agent)
 
int32_t request_remote_command (string &request, string &response, Agent *agent)
 
int32_t request_reopen_exec (string &request, string &response, Agent *agent)
 
int32_t request_set_logstride_exec (string &request, string &response, Agent *agent)
 
int32_t request_get_logstride_exec (string &request, string &response, Agent *agent)
 
int32_t request_reopen_soh (string &request, string &response, Agent *agent)
 
int32_t request_set_logperiod (string &request, string &response, Agent *agent)
 
int32_t request_get_logperiod (string &request, string &response, Agent *agent)
 
int32_t request_set_logstride_soh (string &request, string &response, Agent *agent)
 
int32_t request_get_logstride_soh (string &request, string &response, Agent *agent)
 
int32_t request_set_logstring (string &request, string &response, Agent *agent)
 
int32_t request_get_logstring (string &request, string &response, Agent *agent)
 
int32_t request_save_command (string &request, string &response, Agent *)
 
void collect_data_loop () noexcept
 
void move_and_compress_soh ()
 
void move_and_compress_beacon ()
 
int32_t get_power_mode ()
 
int main (int argc, char *argv[])
 

Variables

static Agentagent
 
static CommandQueue cmd_queue
 
static double logdate_exec =0.
 
static double newlogstride_exec = 300. / 86400.
 
static double logstride_exec = 0.
 
static std::mutex exec_mutex
 
static string jjstring
 
static string myjstring
 
static thread cdthread
 
static string logstring
 
static vector< jsonentry * > logtable
 
static double logdate_soh =0.
 
static double newlogperiod = 30. / 86400.
 
static double logperiod = 0
 
static double newlogstride_soh = 600. / 86400.
 
static double logstride_soh = 0.
 
static std::mutex soh_mutex
 
static std::mutex beacon_mutex
 
static vector< eventstruceventdict
 
static vector< eventstrucevents
 
static beatstruc iscbeat
 
static struct boot_flags boot_flags
 
static double correcttime
 
static double epsilon
 
static double delta
 
static string incoming_dir
 
static string outgoing_dir
 
static string immediate_dir
 
static string temp_dir
 

Detailed Description

Executive Agent source file.

Monitor Agent source file.

Function Documentation

void move_and_compress_exec ( )
838  {
839  exec_mutex.lock();
841  log_move(agent->getNode(), "exec");
842  exec_mutex.unlock();
843 }
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
size_t join_event_threads()
Join all threads spawn and empty our vector.
Definition: command_queue.cpp:115
void log_move(string oldpath, string newpath, bool compress)
Move log file - path version.
Definition: datalib.cpp:200
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
static std::mutex exec_mutex
Definition: agent_exec.cpp:80
static Agent * agent
Definition: agent_exec.cpp:72
int32_t get_last_offset ( )
473 {
474  int32_t offset = 0;
475  FILE *fp = fopen(("/cosmos/nodes/" + agent->nodeName + "/last_offset").c_str(), "r");
476  if (fp != nullptr)
477  {
478  fscanf(fp, "%d", &offset);
479  fclose(fp);
480  }
481  return offset;
482 }
string nodeName
Definition: agentclass.h:367
static Agent * agent
Definition: agent_exec.cpp:72
FILE * fp
Definition: rw_test.cpp:38
int32_t get_flags ( )
485 {
486  int8_t launched = 0;
487  int8_t bootcheck = 0;
488  int8_t deployed = 0;
489  FILE *fp = fopen(("/cosmos/nodes/" + agent->nodeName + "/boot_flags").c_str(), "r");
490  if (fp != nullptr)
491  {
492  fscanf(fp, "%hhd %hhd %hhd", &launched, &deployed, &bootcheck);
493  if (launched)
494  {
495  boot_flags.launched = true;
497  }
498  else
499  {
500  boot_flags.launched = false;
502  }
503  if (bootcheck)
504  {
505  boot_flags.bootcheck = true;
507  }
508  else
509  {
510  boot_flags.bootcheck = false;
512  }
513  if (deployed)
514  {
515  boot_flags.deployed = true;
517  }
518  else
519  {
520  boot_flags.deployed = false;
522  }
523  fclose(fp);
524  return 1;
525  }
526  else
527  {
528  return 0;
529  }
530 }
bool bootcheck
Definition: agent_exec-2-0.cpp:135
string nodeName
Definition: agentclass.h:367
uint16_t flags
Definition: jsondef.h:3579
Definition: agent_exec-2-0.cpp:132
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
Definition: cosmos-defs.h:117
bool deployed
Definition: agent_exec-2-0.cpp:136
static Agent * agent
Definition: agent_exec.cpp:72
cosmosstruc * cinfo
Definition: agentclass.h:346
FILE * fp
Definition: rw_test.cpp:38
Definition: cosmos-defs.h:115
bool launched
Definition: agent_exec-2-0.cpp:134
Definition: cosmos-defs.h:116
int32_t request_get_queue_size ( string &  request,
string &  response,
Agent agent 
)
554 {
556  return 0;
557 }
string to_string(char *value)
Definition: stringlib.cpp:220
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
size_t get_command_size()
Retrieve the size of the queue.
Definition: command_queue.h:100
int32_t request_get_event ( string &  request,
string &  response,
Agent agent 
)
560 {
561  std::ostringstream ss;
562 
563  if(cmd_queue.get_event_size()==0) {
564  ss << "[Empty]";
565  }
566  else {
567  int j;
568  int32_t iretn = sscanf(request.c_str(),"%*s %d",&j);
569 
570  // if valid index then return event
571  if (iretn == 1) {
572  if(j >= 0 && j < static_cast<int>(cmd_queue.get_event_size()))
573  ss << cmd_queue.get_event(j);
574  else
575  ss << "<" << j << "> is not a valid event queue index (current range between 0 and "
576  << cmd_queue.get_event_size()-1 << ")";
577  }
578 
579  // if no index given, return the entire queue
580  else if (iretn == -1) {
581  for(int i = 0; i < static_cast<int>(cmd_queue.get_event_size()); ++i) {
582  Event cmd = cmd_queue.get_event(i);
583  ss << "[" << i << "]" << "[" << mjd2iso8601(cmd.getUtc()) << "]" << cmd << endl;
584  }
585  }
586  // if the user supplied something that couldn't be turned into an integer
587  else if (iretn == 0) { ss << "Usage:\tget_event [ index ]\t"; }
588  }
589 
590  response = ss.str();
591  return 0;
592 }
Event & get_event(int i)
Retrieve an Event by its position in the queue.
Definition: command_queue.h:89
Class to manage Event information.
Definition: event.h:57
int i
Definition: rw_test.cpp:37
int iretn
Definition: rw_test.cpp:37
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
double getUtc()
Retrieves Event::mjd.
Definition: event.h:125
size_t get_event_size()
Retrieve the size of the queue.
Definition: command_queue.h:82
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t request_get_command ( string &  request,
string &  response,
Agent agent 
)
609 {
610  std::ostringstream ss;
611 
612  if(cmd_queue.get_command_size()==0) {
613  ss << "[Empty]";
614  }
615  else {
616  int j;
617  int32_t iretn = sscanf(request.c_str(),"%*s %d",&j);
618 
619  // if valid index then return command
620  if (iretn == 1) {
621  if(j >= 0 && j < static_cast<int>(cmd_queue.get_command_size()))
622  ss << cmd_queue.get_command(j);
623  else
624  ss << "<" << j << "> is not a valid command queue index (current range between 0 and "
625  << cmd_queue.get_command_size()-1 << ")";
626  }
627 
628  // if no index given, return the entire queue
629  else if (iretn == -1) {
630  for(int i = 0; i < static_cast<int>(cmd_queue.get_command_size()); ++i) {
631  Event cmd = cmd_queue.get_command(i);
632  ss << "[" << i << "]" << "[" << mjd2iso8601(cmd.getUtc()) << "]" << cmd << endl;
633  }
634  }
635  // if the user supplied something that couldn't be turned into an integer
636  else if (iretn == 0) { ss << "Usage:\tget_command [ index ]\t"; }
637  }
638 
639  response = ss.str();
640  return 0;
641 }
Class to manage Event information.
Definition: event.h:57
Event & get_command(int i)
Retrieve an Event by its position in the queue.
Definition: command_queue.h:107
int i
Definition: rw_test.cpp:37
int iretn
Definition: rw_test.cpp:37
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
size_t get_command_size()
Retrieve the size of the queue.
Definition: command_queue.h:100
double getUtc()
Retrieves Event::mjd.
Definition: event.h:125
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t request_del_command ( string &  request,
string &  response,
Agent agent 
)
676 {
677  Event cmd;
678  string line(request);
679 
680  // Check for valid request
681  if (line.find(" ") == string::npos)
682  {
684  }
685 
686  line.erase(0, line.find(" ")+1);
687  cmd.set_command(line);
688 
689  //delete command
690  int n = cmd_queue.del_command(cmd);
691 
692  if(!cmd.is_command()) {
693  response = "Not a valid command: " + line;
694  }
695  else {
696  response = std::to_string(n) + " commands deleted from the queue";
697  }
698 
699  return 0;
700 }
Class to manage Event information.
Definition: event.h:57
string to_string(char *value)
Definition: stringlib.cpp:220
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
void set_command(string jstring)
Sets Event information from a JSON formatted string.
Definition: event.cpp:118
bool is_command()
Determines if the Event is a command.
Definition: event.h:161
#define GENERAL_ERROR_UNDEFINED
Definition: cosmos-errno.h:295
int32_t del_command(Event &c)
Remove all matching Event from the queue.
Definition: command_queue.cpp:488
int32_t request_del_command_id ( string &  request,
string &  response,
Agent agent 
)
645 {
646  Event cmd;
647  std::ostringstream ss;
648 
649  if(cmd_queue.get_command_size()==0) {
650  ss << "the command queue is empty";
651  }
652  else {
653  int j;
654  int32_t iretn = sscanf(request.c_str(),"%*s %d",&j);
655 
656  // if valid index then return command
657  if (iretn == 1) {
658  if(j >= 0 && j < static_cast<int>(cmd_queue.get_command_size())) {
659  response = std::to_string(cmd_queue.del_command(j)) + " commands deleted from the queue";
660  } else {
661  ss << "<" << j << "> is not a valid command queue index (current range between 0 and " << cmd_queue.get_command_size()-1 << ")";
662  }
663  }
664  // if the user supplied something that couldn't be turned into an integer
665  else if (iretn == 0) {
666  ss << "Usage:\tdel_command_id [ index ]\t";
667  }
668  }
669 
670  response = ss.str();
671  return 0;
672 }
Class to manage Event information.
Definition: event.h:57
string to_string(char *value)
Definition: stringlib.cpp:220
int iretn
Definition: rw_test.cpp:37
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
size_t get_command_size()
Retrieve the size of the queue.
Definition: command_queue.h:100
int32_t del_command(Event &c)
Remove all matching Event from the queue.
Definition: command_queue.cpp:488
int32_t request_add_command ( string &  request,
string &  response,
Agent agent 
)
704 {
705  Event cmd;
706  string line(request);
707 
708  // Check for valid request
709  if (line.find(" ") == string::npos)
710  {
712  }
713 
714  line.erase(0, line.find(" ")+1);
715  cmd.set_command(line);
716 
717  // add command
718  if(cmd.is_command()) {
719  cmd_queue.add_command(cmd);
720  response = "Command added to queue: " + line;
721  }
722  else {
723  response = "Not a valid command: " + line;
724  }
725 
726  // sort the queue
727  cmd_queue.sort();
728  return 0;
729 }
Class to manage Event information.
Definition: event.h:57
void sort()
Sort the Events in the queue by Event exectution time.
Definition: command_queue.h:210
int32_t add_command(Event &c)
Remove all matching Event from the queue.
Definition: command_queue.cpp:522
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
void set_command(string jstring)
Sets Event information from a JSON formatted string.
Definition: event.cpp:118
bool is_command()
Determines if the Event is a command.
Definition: event.h:161
#define GENERAL_ERROR_UNDEFINED
Definition: cosmos-errno.h:295
int32_t request_remote_command ( string &  request,
string &  response,
Agent agent 
)
733 {
734 // char request_re[AGENTMAXBUFFER + 5];
735  int32_t iretn = 0;
736 // FILE *pd;
737  int i;
738 
739  // Locate where our command starts.
740  bool flag = false;
741  for (i=0; i<AGENTMAXBUFFER-1; i++) {
742  if (flag) {
743  if (request[i] != ' ')
744  break;
745  }
746  else if (request[i] == ' ') {
747  flag = true;
748  }
749  }
750 
751  if (i == AGENTMAXBUFFER-1) {
752  response = "Unable to find an appropriate command.";
753  }
754  else {
755  // Run the process and create a pipe.
756  iretn = data_execute(request.substr(request.find(" ") + 1), response);
757  }
758 
759  return iretn;
760 }
int i
Definition: rw_test.cpp:37
int iretn
Definition: rw_test.cpp:37
int32_t data_execute(string cmd, string &result, string shell)
Definition: datalib.cpp:1947
#define AGENTMAXBUFFER
Maximum AGENT transfer buffer size.
Definition: jsondef.h:438
int32_t request_reopen_exec ( string &  request,
string &  response,
Agent agent 
)
546 {
547  // Do not wait for log stride, move data from temp to incoming immediately.
548  logdate_exec = agent->cinfo->node.loc.utc;
550  return 0;
551 }
void move_and_compress_exec()
Definition: agent_exec.cpp:838
double utc
Master time for location, in Modified Julian Day.
Definition: convertdef.h:879
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
locstruc loc
Location structure.
Definition: jsondef.h:3596
cosmosstruc * cinfo
Definition: agentclass.h:346
static double logdate_exec
Definition: agent_exec.cpp:77
int32_t request_set_logstride_exec ( string &  request,
string &  response,
Agent agent 
)
534 {
535  sscanf(request.c_str(),"%*s %lf",&newlogstride_exec);
536  return 0;
537 }
static double newlogstride_exec
Definition: agent_exec.cpp:78
int32_t request_get_logstride_exec ( string &  request,
string &  response,
Agent agent 
)
539 {
540  response = std::to_string(logstride_exec);
541  return 0;
542 }
string to_string(char *value)
Definition: stringlib.cpp:220
static double logstride_exec
Definition: agent_exec.cpp:79
int32_t request_reopen_soh ( string &  request,
string &  response,
Agent agent 
)
764 {
765  // Do not wait for logstride, push soh log files out of temp and into incoming.
766  logdate_soh = agent->cinfo->node.loc.utc;
768  return 0;
769 }
static double logdate_soh
Definition: agent_exec.cpp:115
double utc
Master time for location, in Modified Julian Day.
Definition: convertdef.h:879
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
void move_and_compress_soh()
Definition: agent_exec.cpp:844
locstruc loc
Location structure.
Definition: jsondef.h:3596
cosmosstruc * cinfo
Definition: agentclass.h:346
int32_t request_set_logperiod ( string &  request,
string &  response,
Agent agent 
)
772 {
773  sscanf(request.c_str(),"%*s %lf",&newlogperiod);
774  newlogperiod /= 86400.;
775  return 0;
776 }
static double newlogperiod
Definition: agent_exec.cpp:116
int32_t request_get_logperiod ( string &  request,
string &  response,
Agent agent 
)
778 {
779  response = std::to_string(86400. * logperiod);
780  return 0;
781 }
string to_string(char *value)
Definition: stringlib.cpp:220
static double logperiod
Definition: agent_exec.cpp:117
int32_t request_set_logstride_soh ( string &  request,
string &  response,
Agent agent 
)
798 {
799  sscanf(request.c_str(),"%*s %lf",&newlogstride_soh);
800  newlogstride_soh /= 86400.;
801  return 0;
802 }
static double newlogstride_soh
Definition: agent_exec.cpp:118
int32_t request_get_logstride_soh ( string &  request,
string &  response,
Agent agent 
)
804 {
805  response = std::to_string(86400. * logstride_soh);
806  return 0;
807 }
string to_string(char *value)
Definition: stringlib.cpp:220
static double logstride_soh
Definition: agent_exec.cpp:119
int32_t request_set_logstring ( string &  request,
string &  response,
Agent agent 
)
784 {
785  request.erase(0, request.find(" ")+1);
786  logtable.clear();
787  json_table_of_list(logtable, request.c_str(), agent->cinfo);
788  return 0;
789 }
int32_t json_table_of_list(vector< jsonentry * > &table, string tokens, cosmosstruc *cinfo)
Output a vector of JSON entries.
Definition: jsonlib.cpp:3086
static vector< jsonentry * > logtable
Definition: agent_exec.cpp:114
cosmosstruc * cinfo
Definition: agentclass.h:346
int32_t request_get_logstring ( string &  request,
string &  response,
Agent agent 
)
792 {
793  response = logstring;
794  return 0;
795 }
static string logstring
Definition: agent_exec.cpp:113
int32_t request_save_command ( string &  request,
string &  response,
Agent  
)
595 {
596  vector<string> args = string_split(request, " ");
597  if (args.size() == 2)
598  {
599  cmd_queue.save_commands(temp_dir, args[1]);
600  }
601  else
602  {
604  }
605  return 0;
606 }
int32_t save_commands(string temp_dir, string name=".queue")
Save the queue of Events to a file.
Definition: command_queue.cpp:353
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
static string temp_dir
Definition: agent_exec.cpp:146
vector< string > string_split(string in, string delimeters)
Parse a string.
Definition: stringlib.cpp:47
void collect_data_loop ( )
noexcept
810 {
811  int32_t iretn;
812  while (agent->running())
813  {
814  // Collect new data
815  Agent::messstruc mess;
816  iretn = agent->readring(mess, Agent::AgentMessage::ALL, 5., Agent::Where::TAIL, "", agent->cinfo->node.name);
817  if (iretn >= 0)
818  {
819  if (mess.meta.type < Agent::AgentMessage::BINARY)
820  {
821  json_parse(mess.adata, agent->cinfo);
822  agent->cinfo->node.utc = currentmjd(0.);
823 
825  {
826  if (device.utc > agent->cinfo->node.utc)
827  {
828  agent->cinfo->node.utc = device.utc;
829  }
830  }
831  }
832  }
833  }
834  return;
835 }
Device structure.
Definition: jsondef.h:3692
int32_t json_parse(string jstring, cosmosstruc *cinfo)
Parse JSON using Name Space.
Definition: jsonlib.cpp:4799
int iretn
Definition: rw_test.cpp:37
double utc
Overall Node time.
Definition: jsondef.h:3592
string adata
Definition: agentclass.h:276
vector< devicestruc > device
Vector of all general (common) information for devices (components) in node.
Definition: jsondef.h:4238
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
double utc
Device information time stamp.
Definition: jsondef.h:1621
char name[40+1]
Node Name.
Definition: jsondef.h:3556
Storage for messages.
Definition: agentclass.h:272
static Agent * agent
Definition: agent_exec.cpp:72
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
pollstruc meta
Definition: agentclass.h:274
AgentMessage type
Definition: agentclass.h:266
cosmosstruc * cinfo
Definition: agentclass.h:346
int32_t readring(messstruc &message, AgentMessage type=Agent::AgentMessage::ALL, float waitsec=1., Where where=Where::TAIL, string proc="", string node="")
Check Ring for message.
Definition: agentclass.cpp:2395
static string device
Definition: ax25_recv.cpp:39
void move_and_compress_soh ( )
844  {
845  soh_mutex.lock();
846  log_move(agent->getNode(), "soh");
847  soh_mutex.unlock();
848 }
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
void log_move(string oldpath, string newpath, bool compress)
Move log file - path version.
Definition: datalib.cpp:200
static std::mutex soh_mutex
Definition: agent_exec.cpp:120
static Agent * agent
Definition: agent_exec.cpp:72
void move_and_compress_beacon ( )
850  {
851  string beacon_string;
852  beacon_mutex.lock();
854  log_move(agent->getNode(), "beacon");
855  beacon_mutex.unlock();
856 }
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
#define DATA_LOG_TYPE_BEACON
Definition: datalib.h:110
static double logdate_soh
Definition: agent_exec.cpp:115
void log_move(string oldpath, string newpath, bool compress)
Move log file - path version.
Definition: datalib.cpp:200
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
char name[40+1]
Node Name.
Definition: jsondef.h:3556
const char * json_of_beacon(string &jstring, cosmosstruc *cinfo)
Create JSON Beacon string.
Definition: jsonlib.cpp:9059
static Agent * agent
Definition: agent_exec.cpp:72
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
cosmosstruc * cinfo
Definition: agentclass.h:346
static std::mutex beacon_mutex
Definition: agent_exec.cpp:122
int32_t get_power_mode ( )
860 {
861  int32_t powermode = -1;
862  uint16_t tindex;
863  string tdata;
864  int32_t iretn = data_execute("power_mode_get", tdata);
865  if(iretn<0) {
866  return powermode;
867  } else {
868  if (sscanf(tdata.c_str(), "%u\n", &tindex) == 1)
869  {
870  if (tindex > 0)
871  {
872  powermode = tindex;
873  }
874  }
875  // FILE *fp = popen("/cosmos/scripts/power_mode_get", "r");
876 // if (fp == nullptr)
877 // {
878 // return -errno;
879 // }
880 // char tdata[100];
881 // uint16_t tindex;
882 // if ((fgets(tdata, 100, fp)) == tdata)
883 // {
884 // if (sscanf(tdata, "%u\n", &tindex) == 1)
885 // {
886 // if (tindex > 0)
887 // {
888 // powermode = tindex;
889 // }
890 // }
891 // }
892 // pclose(fp);
893  return powermode;
894  }
895 }
int iretn
Definition: rw_test.cpp:37
int32_t data_execute(string cmd, string &result, string shell)
Definition: datalib.cpp:1947
int main ( int  argc,
char *  argv[] 
)
152 {
153  vector<eventstruc> events, eventdict;
154  string jjstring, myjstring;
155  double llogmjd, dlogmjd, clogmjd;
156  int32_t iretn;
157 
158  // Set node name to first argument
159  if (argc == 2)
160  {
161  agent = new Agent(argv[1], "exec", 0.);
162  }
163  else
164  {
165  agent = new Agent("", "exec", 0.);
166  }
167 
168  if ((iretn = agent->wait()) < 0)
169  {
170  fprintf(agent->get_debug_fd(), "%16.10f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
171  exit(iretn);
172  }
173  else
174  {
175  fprintf(agent->get_debug_fd(), "%16.10f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
176  }
177 
178  // Fix time
179  iretn = agent->get_agent_time(correcttime, epsilon, delta, "gps");
180  if (iretn >= 0)
181  {
182  printf("Initialized time from time GPS: Delta %f %f Epsilon %f\n", 86400. * (correcttime - currentmjd()), 86400.*delta, 86400.*epsilon);
184  }
185  else
186  {
187  FILE *fp = fopen(("/cosmos/nodes/" + agent->nodeName + "/last_date").c_str(), "r");
188  if (fp != nullptr)
189  {
190  calstruc date;
191  fscanf(fp, "%02d%02d%02d%02d%04d%*c%02d\n", &date.month, &date.dom, &date.hour, &date.minute, &date.year, &date.second);
192  fclose(fp);
193  int32_t offset = get_last_offset();
194  date.second += offset;
195  double delta = cal2mjd(date) - currentmjd();
196  if (delta > 3.5e-4)
197  {
198  delta = set_local_clock(cal2mjd(date));
199  printf("Initialized time from file: Delta %f\n", delta);
200  }
201  }
202  }
203 
204  // Check for launched
205  get_flags();
206 
207  agent->cinfo->node.utc = 0.;
208  agent->cinfo->agent[0].aprd = 1.;
209  cout<<" started."<<endl;
210 
211  // Establish Executive functions
212 
213  // Set the immediate, incoming, outgoing, and temp directories
214  immediate_dir = data_base_path(agent->getNode(), "immediate", "exec") + "/";
215  if (immediate_dir.empty())
216  {
217  cout<<"unable to create directory: <"<<(agent->getNode()+"/immediate")+"/exec"<<"> ... exiting."<<endl;
218  exit(1);
219  }
220 
221  incoming_dir = data_base_path(agent->getNode(), "incoming", "exec") + "/";
222  if (incoming_dir.empty())
223  {
224  cout<<"unable to create directory: <"<<(agent->getNode()+"/incoming")+"/exec"<<"> ... exiting."<<endl;
225  exit(1);
226  }
227 
228  outgoing_dir = data_base_path(agent->getNode(), "outgoing", "exec") + "/";
229  if (outgoing_dir.empty())
230  {
231  cout<<"unable to create directory: <"<<(agent->getNode()+"/outgoing")+"/exec"<<"> ... exiting."<<endl;
232  exit(1);
233  }
234 
235  temp_dir = data_base_path(agent->getNode(), "temp", "exec") + "/";
236  if (temp_dir.empty())
237  {
238  cout<<"unable to create directory: <"<<(agent->getNode()+"/temp")+"/exec"<<"> ... exiting."<<endl;
239  exit(1);
240  }
241 
242  // Event related request functions
243  if ((iretn=agent->add_request("getqueuesize", request_get_queue_size, "", "returns the current size of the command queue")))
244  exit (iretn);
245  if ((iretn=agent->add_request("delcommand", request_del_command, "entry-string", "deletes the specified command event from the queue according to its JSON string")))
246  exit (iretn);
247  if ((iretn=agent->add_request("delcommandid", request_del_command_id, "entry #", "deletes the specified command event from the queue according to its position")))
248  exit (iretn);
249  if ((iretn=agent->add_request("getcommand", request_get_command, "[ entry # ]", "returns the requested command queue entry (or all if none specified)")))
250  exit (iretn);
251  if ((iretn=agent->add_request("getqueue", request_get_command, "[ entry # ]", "returns the requested command queue entry (or all if none specified)")))
252  exit (iretn);
253  if ((iretn=agent->add_request("addcommand", request_add_command, "{\"event_name\":\"\"}{\"event_utc\":0}{\"event_utcexec\":0}{\"event_flag\":0}{\"event_type\":0}{\"event_data\":\"\"}{\"event_condition\":\"\"}", "adds the specified command event to the queue")))
254  exit (iretn);
255  if ((iretn=agent->add_request("runremotecommand", request_remote_command, "command-to-run", "run a command local to the agent and return its output")))
256  exit (iretn);
257  if ((iretn=agent->add_request("getevent", request_get_event, "[ entry # ]", "returns the requested event queue entry (or all if none specified)")))
258  exit (iretn);
259  if ((iretn=agent->add_request("reopenexec", request_reopen_exec, "", "flushes exec log files out of temp directory")))
260  exit (iretn);
261  if ((iretn=agent->add_request("setlogstrideexec", request_set_logstride_exec, "logstride", "modify how frequently we flush exec log files out of temp directory")))
262  exit (iretn);
263  if ((iretn=agent->add_request("getlogstrideexec", request_get_logstride_exec, "", "return how frequently we flush exec log files out of temp directory")))
264  exit (iretn);
265  if ((iretn=agent->add_request("savecommand", request_save_command, "{file}", "Save current command list to optional file name (default: .queue)")))
266  exit (iretn);
267  if ((iretn=agent->add_request("savequeue", request_save_command, "{file}", "Save current command list to optional file name (default: .queue)")))
268  exit (iretn);
269 
270  // SOH related request functions
271  if ((iretn=agent->add_request("reopensoh", request_reopen_soh, "", "flushes soh log files out of temp directory")))
272  exit (iretn);
273  if ((iretn=agent->add_request("setlogperiod" ,request_set_logperiod, "logperiod", "set how often we log our SOH data")))
274  exit (iretn);
275  if ((iretn=agent->add_request("getlogperiod" ,request_get_logperiod, "", "return how often we log our SOH data")))
276  exit (iretn);
277  if ((iretn=agent->add_request("setlogstring", request_set_logstring, "logstring", "set what parts of our SOH that we log.")))
278  exit (iretn);
279  if ((iretn=agent->add_request("getlogstring", request_get_logstring, "", "return what portions of our SOH that we log")))
280  exit (iretn);
281  if ((iretn=agent->add_request("setlogstride_soh", request_set_logstride_soh, "logstride", "modify how frequently we flush our soh log files out of temp directory")))
282  exit (iretn);
283  if ((iretn=agent->add_request("getlogstride_soh", request_get_logstride_soh, "", "return how frequently we flush our soh log files out of temp directory")))
284  exit (iretn);
285 
286  load_dictionary(eventdict, agent->cinfo, "events.dict");
287 
288  // Reload existing queue
290 
291  // Start thread to collect SOH data
292  bool log_data_flag = true;
293  vector <beatstruc> servers = agent->find_agents(1.);
294  for (beatstruc &i : servers)
295  {
296  if (strcmp(i.node, agent->nodeName.c_str()) && !strcmp(i.proc, "exec"))
297  {
298  log_data_flag = false;
299  break;
300  }
301  }
302 
303  if (log_data_flag)
304  {
305  printf("Primary: SOH logger\n");
306  }
307  else
308  {
309  printf("Secondary: Not logging SOH\n");
310  }
311 
312  cdthread = thread([=] { collect_data_loop(); });
313 
314  // Create default logstring
316  printf("===\nlogstring: %s\n===\n", logstring.c_str()); fflush(stdout);
317  json_table_of_list(logtable, logstring.c_str(), agent->cinfo);
318 
319  agent->set_sohstring(logstring);
320 
321  // Start performing the body of the agent
322  agent->post(Agent::AgentMessage::REQUEST, "postsoh");
323  COSMOS_SLEEP(10.);
324  llogmjd = currentmjd();
325  clogmjd = currentmjd();
326  logdate_exec = 0.;
327  logstride_exec = 0.;
329  agent->debug_level = 0;
330  ElapsedTime postet;
331  ElapsedTime savet;
332  while(agent->running())
333  {
334  int32_t next_power_mode = get_power_mode();
335  agent->cinfo->node.powmode = next_power_mode;
336 
337  // Fix time
338 // iretn = agent->get_agent_time(correcttime, epsilon, delta, "gps");
339 // if (iretn >= 0 && fabs(delta*86400.) > 30.)
340 // {
341 // printf("Initialized time from time GPS: Delta %f %f Epsilon %f\n", 86400. * (correcttime - currentmjd()), 86400.*delta, 86400.*epsilon);
342 // set_local_clock(correcttime);
343 // llogmjd = currentmjd();
344 // clogmjd = currentmjd();
345 // logdate_exec = 0.;
346 // logstride_exec = 0.;
347 // }
348 // else
349 // {
350 // iretn = agent->get_agent_time(correcttime, epsilon, delta, "duplex");
351 // if (iretn >= 0 && fabs(delta*86400.) > 30.)
352 // {
353 // printf("Initialized time from time DUPLEX: Delta %f %f Epsilon %f\n", 86400. * (correcttime - currentmjd()), 86400.*delta, 86400.*epsilon);
354 // set_local_clock(correcttime);
355 // llogmjd = currentmjd();
356 // clogmjd = currentmjd();
357 // logdate_exec = 0.;
358 // logstride_exec = 0.;
359 // }
360 // }
361 
362  dlogmjd = (clogmjd-llogmjd)*86400.;
363  agent->cinfo->node.utc = clogmjd = currentmjd();
365 
366  // Check if exec logstride has changed
368  {
370  logdate_exec = currentmjd(0.);
372  }
373 
374  // Check if exec logstride has expired
375  if (logstride_exec != 0. && floor(clogmjd/logstride_exec)*logstride_exec > logdate_exec)
376  {
377  logdate_exec = floor(clogmjd/logstride_exec)*logstride_exec;
379  }
380 
381  // Check if the SOH logperiod has changed
382  if (newlogperiod != logperiod )
383  {
386 
387  if (log_data_flag)
388  {
390  }
391  }
392 
394  {
396  logdate_soh = currentmjd(0.);
397 
398 
399  if (log_data_flag)
400  {
403  }
404  }
405 
406  if (logstride_soh != 0. && floor(clogmjd/logstride_soh)*logstride_soh > logdate_soh)
407  {
408  logdate_soh = floor(clogmjd/logstride_soh)*logstride_soh;
409  if (log_data_flag)
410  {
413  }
414  }
415 
416  // Check if SOH logperiod has expired
417  if (dlogmjd-logperiod > -logperiod/20.)
418  {
419  llogmjd = clogmjd;
420  if (log_data_flag && agent->cinfo->node.utc != 0. && logstring.size())
421  {
423 // log_write(agent->cinfo->node.name, DATA_LOG_TYPE_SOH, logdate_soh, json_of_table(jjstring, logtable, agent->cinfo), static_cast <string>("immediate"));
424  }
425  }
426 
427  // Perform SOH specific functions
428  if (log_data_flag && logstride_soh > 0. && agent->cinfo->node.utc != 0. && postet.split() >= 43200. * logperiod)
429  {
430  postet.reset();
431  agent->post(Agent::AgentMessage::REQUEST, "postsoh");
432 
435  // agent->post(Agent::AgentMessage::SOH, json_of_table(myjstring, logtable, agent->cinfo));
436  calc_events(eventdict, agent->cinfo, events);
437  for (uint32_t k=0; k<events.size(); ++k)
438  {
439  memcpy(&agent->cinfo->event[0],&events[k],sizeof(eventstruc));
440  strcpy(agent->cinfo->event[0].condition,agent->cinfo->emap[events[k].handle.hash][events[k].handle.index].text);
442  }
443  }
444 
445  // Perform Executive specific functions
450 
451  if (savet.split() > 60.)
452  {
453  FILE *fp = fopen(("/cosmos/nodes/" + agent->nodeName + "/last_date").c_str(), "w");
454  if (fp)
455  {
456  savet.reset();
457  calstruc date = mjd2cal(currentmjd());
458  fprintf(fp, "%02d%02d%02d%02d%04d.59\n", date.month, date.dom, date.hour, date.minute, date.year);
459  fclose(fp);
460  }
461  }
463  }
464 
465  agent->shutdown();
466  if (log_data_flag)
467  {
468  cdthread.join();
469  }
470 }
int32_t dom
Definition: timelib.h:110
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
int32_t finish_active_loop()
Definition: agentclass.cpp:355
string json_list_of_soh(cosmosstruc *cinfo)
Definition: jsonlib.cpp:9252
#define DATA_LOG_TYPE_EVENT
Definition: datalib.h:109
calstruc mjd2cal(double mjd)
MJD to Calendar.
Definition: timelib.cpp:180
int16_t powmode
Definition: jsondef.h:3580
void move_and_compress_exec()
Definition: agent_exec.cpp:838
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
size_t calc_events(vector< eventstruc > &dictionary, cosmosstruc *cinfo, vector< eventstruc > &events)
Calculate current Events.
Definition: jsonlib.cpp:11356
Definition: eci2kep_test.cpp:33
static string logstring
Definition: agent_exec.cpp:113
Definition: jsondef.h:923
static string myjstring
Definition: agent_exec.cpp:107
int32_t get_flags()
Definition: agent_exec.cpp:484
string nodeName
Definition: agentclass.h:367
int i
Definition: rw_test.cpp:37
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
const char * json_of_table(string &jstring, vector< jsonentry * > table, cosmosstruc *cinfo)
Create JSON stream from entries.
Definition: jsonlib.cpp:8733
int32_t get_last_offset()
Definition: agent_exec.cpp:472
int32_t update_target(cosmosstruc *cinfo)
Update Track list.
Definition: jsonlib.cpp:11262
int32_t load_commands(string incoming_dir)
Load queue of Events from a file.
Definition: command_queue.cpp:420
int32_t request_remote_command(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:732
static double logdate_soh
Definition: agent_exec.cpp:115
size_t join_event_threads()
Join all threads spawn and empty our vector.
Definition: command_queue.cpp:115
int iretn
Definition: rw_test.cpp:37
static double newlogstride_exec
Definition: agent_exec.cpp:78
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
double utc
Overall Node time.
Definition: jsondef.h:3592
vector< vector< jsonequation > > emap
JSON Equation Map matrix.
Definition: jsondef.h:4211
static double logstride_exec
Definition: agent_exec.cpp:79
int32_t request_del_command(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:675
int32_t hour
Definition: timelib.h:112
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
int32_t request_get_event(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:559
int32_t set_sohstring(string list)
Set Limited SOH string.
Definition: agentclass.cpp:641
static double delta
Definition: agent_exec.cpp:141
int32_t request_add_command(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:703
static CommandQueue cmd_queue
Definition: agent_exec.cpp:74
int32_t request_reopen_soh(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:763
int32_t run_commands(Agent *agent, string nodename, double logdate_exec)
Traverse the entire queue of Events, clearing those that have finished.
Definition: command_queue.cpp:261
int32_t json_table_of_list(vector< jsonentry * > &table, string tokens, cosmosstruc *cinfo)
Output a vector of JSON entries.
Definition: jsonlib.cpp:3086
int32_t request_del_command_id(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:644
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
static double logperiod
Definition: agent_exec.cpp:117
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
Definition: timelib.h:106
int32_t request_get_logstring(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:791
int32_t get_agent_time(double &agent_time, double &epsilon, double &delta, string agent, string node="any", double wait_sec=2.)
Definition: agentclass.cpp:2711
int32_t post(messstruc mess)
Post a Cosmos::Agent::messstruc.
Definition: agentclass.cpp:2074
static vector< eventstruc > eventdict
Definition: agent_exec.cpp:127
void move_and_compress_soh()
Definition: agent_exec.cpp:844
double set_local_clock(double utc_to)
Definition: timelib.cpp:1626
static string outgoing_dir
Definition: agent_exec.cpp:144
static double newlogperiod
Definition: agent_exec.cpp:116
int32_t month
Definition: timelib.h:109
uint32_t downtime
Seconds Node will be down.
Definition: jsondef.h:3582
string getAgent()
Definition: agentclass.cpp:2609
static string incoming_dir
Definition: agent_exec.cpp:143
static vector< eventstruc > events
Definition: agent_exec.cpp:128
int32_t add_request(string token, external_request_function function, string synopsis="", string description="")
Add internal request to Agent request list with description and synopsis.
Definition: agentclass.cpp:312
static string immediate_dir
Definition: agent_exec.cpp:145
int32_t restore_commands(string temp_dir, string name=".queue")
Restore the queue of Events from a file.
Definition: command_queue.cpp:385
char name[40+1]
Node Name.
Definition: jsondef.h:3556
Definition: agentclass.h:139
double cal2mjd(int32_t year, int32_t month, int32_t day, int32_t hour, int32_t minute, int32_t second, int32_t nsecond)
Calendar representation to Modified Julian Day - full.
Definition: timelib.cpp:294
locstruc loc
Location structure.
Definition: jsondef.h:3596
size_t load_dictionary(vector< eventstruc > &dict, cosmosstruc *cinfo, const char *file)
Load Event Dictionary.
Definition: jsonlib.cpp:11313
int32_t request_set_logstride_exec(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:533
int32_t shutdown()
Shutdown agent gracefully.
Definition: agentclass.cpp:366
Full COSMOS Event structure.
Definition: jsondef.h:1093
int32_t request_save_command(string &request, string &response, Agent *)
Definition: agent_exec.cpp:594
int32_t get_power_mode()
Definition: agent_exec.cpp:859
static thread cdthread
Definition: agent_exec.cpp:110
const char * json_of_event(string &jstring, cosmosstruc *cinfo)
Create JSON for an event.
Definition: jsonlib.cpp:9854
int32_t request_set_logstring(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:783
double data_ctime(string path)
Definition: datalib.cpp:1910
int32_t request_reopen_exec(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:545
static Agent * agent
Definition: agent_exec.cpp:72
int32_t minute
Definition: timelib.h:113
Definition: elapsedtime.h:62
int32_t loc_update(locstruc *loc)
Synchronize all frames in location structure.
Definition: convertlib.cpp:2294
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static double epsilon
Definition: agent_exec.cpp:140
static string jjstring
Definition: agent_exec.cpp:106
static double logstride_soh
Definition: agent_exec.cpp:119
#define DATA_LOG_TYPE_SOH
Definition: datalib.h:108
static double correcttime
Definition: agent_exec.cpp:139
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
vector< agentstruc > agent
Single entry vector for agent information.
Definition: jsondef.h:4247
static vector< jsonentry * > logtable
Definition: agent_exec.cpp:114
int32_t request_set_logperiod(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:771
int32_t request_set_logstride_soh(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:797
int32_t year
Definition: timelib.h:108
int32_t start_active_loop()
Definition: agentclass.cpp:347
cosmosstruc * cinfo
Definition: agentclass.h:346
FILE * fp
Definition: rw_test.cpp:38
vector< eventstruc > event
Single entry vector for event information.
Definition: jsondef.h:4250
int32_t request_get_logstride_exec(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:538
int32_t second
Definition: timelib.h:114
void collect_data_loop() noexcept
Definition: agent_exec.cpp:809
int32_t request_get_command(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:608
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
static double newlogstride_soh
Definition: agent_exec.cpp:118
int32_t request_get_logperiod(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:777
static string temp_dir
Definition: agent_exec.cpp:146
vector< beatstruc > find_agents(double waitsec=0.)
Find single server.
Definition: agentclass.cpp:605
static double logdate_exec
Definition: agent_exec.cpp:77
void reset()
ElapsedTime::reset.
Definition: elapsedtime.cpp:278
void move_and_compress_beacon()
Definition: agent_exec.cpp:850
int32_t request_get_logstride_soh(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:803
int32_t request_get_queue_size(string &request, string &response, Agent *agent)
Definition: agent_exec.cpp:553

Variable Documentation

Agent* agent
static
CommandQueue cmd_queue
static
double logdate_exec =0.
static
double newlogstride_exec = 300. / 86400.
static
double logstride_exec = 0.
static
std::mutex exec_mutex
static
string jjstring
static
string myjstring
static
thread cdthread
static
string logstring
static
vector<jsonentry*> logtable
static
double logdate_soh =0.
static
double newlogperiod = 30. / 86400.
static
double logperiod = 0
static
double newlogstride_soh = 600. / 86400.
static
double logstride_soh = 0.
static
std::mutex soh_mutex
static
std::mutex beacon_mutex
static
vector<eventstruc> eventdict
static
vector<eventstruc> events
static
beatstruc iscbeat
static
struct boot_flags boot_flags
static
double correcttime
static
double epsilon
static
double delta
static
string incoming_dir
static
string outgoing_dir
static
string immediate_dir
static
string temp_dir
static