COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_forward.cpp File Reference
#include "support/configCosmos.h"
#include <algorithm>
#include "agent/agentclass.h"
Include dependency graph for agent_forward.cpp:

Macros

#define MAXBUFFERSIZE   2560
 

Functions

void opening_loop ()
 
void forwarding_loop ()
 
int32_t request_add_forward (string &req, string &, Agent *)
 
int32_t request_del_forward (string &req, string &, Agent *)
 
int main (int argc, char *argv[])
 

Variables

static Agentagent
 
static socket_channel rcvchan
 
static vector< socket_channelsendchan
 

Macro Definition Documentation

#define MAXBUFFERSIZE   2560

Function Documentation

void opening_loop ( )
170 {
171  while(agent->running())
172  {
173  for (socket_channel &tchan : sendchan)
174  {
175  if (tchan.cudp < 0)
176  {
177  socket_channel tempchan;
178  if ((socket_open(&tempchan, NetworkType::UDP, tchan.address, AGENTRECVPORT, SOCKET_TALK, SOCKET_BLOCKING, AGENTRCVTIMEO)) == 0)
179  {
180  tchan = tempchan;
181  }
182  }
183  }
184  COSMOS_SLEEP(.1);
185  }
186 }
Agent socket using Unicast UDP.
#define SOCKET_TALK
Talk followed by optional listen (sendto address)
Definition: socketlib.h:82
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
#define AGENTRECVPORT
Default RECV port.
Definition: agentclass.h:200
Definition: socketlib.h:115
#define SOCKET_BLOCKING
Blocking Agent.
Definition: socketlib.h:78
#define AGENTRCVTIMEO
Default AGENT socket RCVTIMEO (100 msec)
Definition: agentclass.h:208
static Agent * agent
Definition: agent_forward.cpp:37
int32_t socket_open(socket_channel *channel, NetworkType ntype, const char *address, uint16_t port, uint16_t role, bool blocking, uint32_t usectimeo, uint32_t rcvbuf, uint32_t sndbuf)
Open UDP socket.
Definition: socketlib.cpp:51
static vector< socket_channel > sendchan
Definition: agent_forward.cpp:39
void forwarding_loop ( )
189 {
190  int32_t nbytes;
191  vector <uint8_t> input;
192 
193  while(agent->running())
194  {
195  if ((nbytes = socket_recvfrom(rcvchan, input, AGENTMAXBUFFER, 0)) > 0)
196  {
197  // New forwarder? Add to forwarding list
198  bool found = false;
199  for (size_t i=0; i<sendchan.size(); ++i)
200  {
201  if (!strcmp(sendchan[i].address, rcvchan.address))
202  {
203  found = true;
204  }
205  }
206 
207  if (!found)
208  {
209  socket_channel tempchan;
210  strncpy(tempchan.address, rcvchan.address, 17);
211  sendchan.push_back(tempchan);
212  if (agent->debug_level)
213  {
214  fprintf(agent->get_debug_fd(), "%s: Added %s\n", mjd2iso8601(currentmjd()).c_str(), tempchan.address);
215  }
216  }
217 
218  for (size_t i=0; i<agent->cinfo->agent[0].ifcnt; ++i)
219  {
220  sendto(agent->cinfo->agent[0].pub[i].cudp, (const char *)input.data(), input.size(), 0, (struct sockaddr *)&agent->cinfo->agent[0].pub[i].baddr, sizeof(struct sockaddr_in));
221 // socket_sendto(agent->cinfo->agent[0].pub[i], input);
222  }
223  }
224  }
225 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static socket_channel rcvchan
Definition: agent_forward.cpp:38
int i
Definition: rw_test.cpp:37
int32_t socket_recvfrom(socket_channel &channel, string &buffer, size_t maxlen, int flags)
Definition: socketlib.cpp:710
char address[17]
Definition: socketlib.h:134
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
char address[]
Definition: netperf_listen.cpp:69
Definition: socketlib.h:115
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
vector< agentstruc > agent
Single entry vector for agent information.
Definition: jsondef.h:4247
cosmosstruc * cinfo
Definition: agentclass.h:346
#define AGENTMAXBUFFER
Maximum AGENT transfer buffer size.
Definition: jsondef.h:438
static Agent * agent
Definition: agent_forward.cpp:37
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
static vector< socket_channel > sendchan
Definition: agent_forward.cpp:39
int32_t request_add_forward ( string &  req,
string &  ,
Agent  
)
228 {
229  char address[50];
230  sscanf(req.c_str(), "%*s %s", address);
231 
232  bool found = false;
233  for (size_t i=0; i<sendchan.size(); ++i)
234  {
235  if (!strcmp(sendchan[i].address, address))
236  {
237  found = true;
238  sendchan[i].cudp = -1;
239  break;
240  }
241  }
242 
243  if (!found)
244  {
245  socket_channel tempchan;
246  strncpy(tempchan.address, address, 17);
247  sendchan.push_back(tempchan);
248  }
249 
250  return 0;
251 }
int i
Definition: rw_test.cpp:37
char address[17]
Definition: socketlib.h:134
char address[]
Definition: netperf_listen.cpp:69
Definition: socketlib.h:115
static vector< socket_channel > sendchan
Definition: agent_forward.cpp:39
int32_t request_del_forward ( string &  req,
string &  ,
Agent  
)
254 {
255  char address[50];
256  sscanf(req.c_str(), "%*s %s", address);
257 
258  for (size_t i=0; i<sendchan.size(); ++i)
259  {
260  if (!strcmp(sendchan[i].address, address))
261  {
263  sendchan[i].cudp = -2;
264  }
265  }
266  return 0;
267 }
int i
Definition: rw_test.cpp:37
int32_t socket_close(socket_channel *channel)
Close socket.
Definition: socketlib.cpp:509
char address[]
Definition: netperf_listen.cpp:69
static vector< socket_channel > sendchan
Definition: agent_forward.cpp:39
int main ( int  argc,
char *  argv[] 
)
47 {
48  int32_t iretn;
49 
50  if (argc < 1)
51  {
52  printf("Usage: agent_forward [{ipaddresses1} {ipaddress2} {ipaddress3} ...]\n");
53  exit (1);
54  }
55 
56  // Initialize the Agent
57  agent = new Agent("", "forward", 5.);
58  if ((iretn = agent->wait()) < 0)
59  {
60  fprintf(agent->get_debug_fd(), "%16.10f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
61  exit(iretn);
62  }
63  else
64  {
65  fprintf(agent->get_debug_fd(), "%16.10f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
66  }
67 
68 
69  // Add requests
70  if ((iretn=agent->add_request("add_forward",request_add_forward,"add_forward xxx.xxx.xxx.xxx", "Add address to forwarding list.")))
71  exit (iretn);
72 
73  // Open sockets to each address to be used for outgoing forwarding.
74  for (uint16_t i=1; i<argc; ++i)
75  {
76  socket_channel tempchan;
78  {
79  sendchan.push_back(tempchan);
80  }
81  }
82 
83  // Open the socket for incoming forwarding.
85  {
86  for (uint16_t i=0; i<sendchan.size(); ++i)
87  {
88  close(sendchan[i].cudp);
89  }
90  agent->shutdown();
91  printf("Could not open incoming socket for forwarding: %d\n", iretn);
92  exit (1);
93  }
94 
95  // Start thread for incoming forwarding.
96  thread thread_forwarding;
97  thread_forwarding = thread(forwarding_loop);
98 
99  thread thread_opening;
100  thread_opening = thread(opening_loop);
101 
102  // Start performing the body of the agent
103  // uint8_t post[AGENTMAXBUFFER];
104  // size_t nbytes;
105 
106  while(agent->running())
107  {
108  Agent::messstruc mess;
109  iretn = agent->readring(mess, Agent::AgentMessage::ALL, 1., Agent::Where::TAIL);
110 
111  if (iretn > 0)
112  {
113  vector <uint8_t> post;
114  post.resize(mess.jdata.length() + 3 + (mess.meta.type < Agent::AgentMessage::BINARY ? mess.adata.size() : mess.bdata.size()));
115  post[0] = static_cast <uint8_t>(mess.meta.type);
116  post[1] = static_cast <uint8_t>(mess.jdata.length() % 256);
117  post[2] = static_cast <uint8_t>(mess.jdata.length() / 256);
118  std::copy(mess.jdata.begin(), mess.jdata.end(), post.begin() + 3);
119 
120  if (post.size() <= AGENTMAXBUFFER)
121  {
122  if (mess.meta.type < Agent::AgentMessage::BINARY)
123  {
124  if (mess.adata.size())
125  {
126  std::copy(mess.adata.begin(), mess.adata.end(), post.begin() + mess.jdata.length() + 3);
127  }
128  }
129  else
130  {
131  if (mess.bdata.size())
132  {
133  std::copy(mess.bdata.begin(), mess.bdata.end(), post.begin() + mess.jdata.length() + 3);
134  }
135  }
136  }
137 
138  // Forward to all connected forwarders
139  for (uint16_t i=0; i<sendchan.size(); ++i)
140  {
141  if (sendchan[i].cudp >= 0)
142  {
143  iretn = socket_sendto(sendchan[i], post);
144  if (agent->debug_level > 1)
145  {
146  if (iretn < 0)
147  {
148  fprintf(agent->get_debug_fd(), "%s: Failed To %s\n", mjd2iso8601(currentmjd()).c_str(), sendchan[i].address);
149  }
150  else
151  {
152  fprintf(agent->get_debug_fd(), "%s: Sent %zu Bytes To %s\n", mjd2iso8601(currentmjd()).c_str(), post.size(), sendchan[i].address);
153  }
154  }
155  }
156  }
157  }
158  }
159 
160  thread_forwarding.join();
161  thread_opening.join();
162  for (uint16_t i=0; i<sendchan.size(); ++i)
163  {
164  close(sendchan[i].cudp);
165  }
166  agent->shutdown();
167 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static socket_channel rcvchan
Definition: agent_forward.cpp:38
Agent socket using Unicast UDP.
vector< uint8_t > bdata
Definition: agentclass.h:275
int i
Definition: rw_test.cpp:37
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
int32_t request_add_forward(string &req, string &, Agent *)
Definition: agent_forward.cpp:227
int iretn
Definition: rw_test.cpp:37
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
int32_t socket_sendto(socket_channel &channel, const string buffer, int flags)
Definition: socketlib.cpp:737
string adata
Definition: agentclass.h:276
#define SOCKET_TALK
Talk followed by optional listen (sendto address)
Definition: socketlib.h:82
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
#define AGENTRECVPORT
Default RECV port.
Definition: agentclass.h:200
string getAgent()
Definition: agentclass.cpp:2609
int32_t add_request(string token, external_request_function function, string synopsis="", string description="")
Add internal request to Agent request list with description and synopsis.
Definition: agentclass.cpp:312
Definition: agentclass.h:139
Definition: socketlib.h:115
#define SOCKET_BLOCKING
Blocking Agent.
Definition: socketlib.h:78
int32_t shutdown()
Shutdown agent gracefully.
Definition: agentclass.cpp:366
#define AGENTRCVTIMEO
Default AGENT socket RCVTIMEO (100 msec)
Definition: agentclass.h:208
Storage for messages.
Definition: agentclass.h:272
double data_ctime(string path)
Definition: datalib.cpp:1910
void opening_loop()
Definition: agent_forward.cpp:169
void forwarding_loop()
Definition: agent_forward.cpp:188
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
pollstruc meta
Definition: agentclass.h:274
string jdata
Definition: agentclass.h:277
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
AgentMessage type
Definition: agentclass.h:266
#define SOCKET_LISTEN
Listen followed by optional talk (recvfrom INADDRANY)
Definition: socketlib.h:84
#define AGENTMAXBUFFER
Maximum AGENT transfer buffer size.
Definition: jsondef.h:438
static Agent * agent
Definition: agent_forward.cpp:37
int32_t socket_open(socket_channel *channel, NetworkType ntype, const char *address, uint16_t port, uint16_t role, bool blocking, uint32_t usectimeo, uint32_t rcvbuf, uint32_t sndbuf)
Open UDP socket.
Definition: socketlib.cpp:51
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t readring(messstruc &message, AgentMessage type=Agent::AgentMessage::ALL, float waitsec=1., Where where=Where::TAIL, string proc="", string node="")
Check Ring for message.
Definition: agentclass.cpp:2395
static vector< socket_channel > sendchan
Definition: agent_forward.cpp:39

Variable Documentation

Agent* agent
static
socket_channel rcvchan
static
vector<socket_channel> sendchan
static