COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_tunnel.cpp File Reference
#include "support/configCosmos.h"
#include <stdio.h>
#include "agent/agentclass.h"
#include "device/serial/serialclass.h"
Include dependency graph for agent_tunnel.cpp:

Macros

#define MAXBUFFERSIZE   2560
 
#define TUN_BUF_SIZE   2000
 

Functions

void tcv_read_loop ()
 
void tcv_write_loop ()
 
void tun_read_loop ()
 
void tun_write_loop ()
 
int main (int argc, char *argv[])
 

Variables

static Agentagent
 
static std::queue< vector< uint8_t > > tun_fifo
 
static std::queue< vector< uint8_t > > tcv_fifo
 
static std::condition_variable tcv_fifo_check
 
static std::condition_variable tun_fifo_check
 
static int tun_fd
 
static string rxr_devname
 
static string txr_devname
 
static Serialrxr_serial =nullptr
 
static Serialtxr_serial =nullptr
 
static uint32_t rxr_baud = 9600
 
static uint32_t txr_baud = 9600
 

Macro Definition Documentation

#define MAXBUFFERSIZE   2560
#define TUN_BUF_SIZE   2000

Function Documentation

void tcv_read_loop ( )
359 {
360  std::vector<uint8_t> buffer;
361  int32_t iretn;
362  double lastin = currentmjd(0.);
363  double lastbeacon = currentmjd(0.);
364 
365  while (agent->running())
366  {
367  // Read data from receiver port
368  iretn = kpc9612p_recvframe(&rxr_handle);
369  if (iretn > 0)
370  { // Start of mutex for tun FIFO
372  // uint16_t cs1 = kpc9612p_calc_fcs(&buffer[17], buffer.size()-17);
373  // uint16_t cs2 = *(uint16_t*)(astroin->payload+astroin->header.size-4);
374  if (buffer[SOCKET_IP_BYTE_VERSION]>>4 == 4 && buffer.size() == buffer[SOCKET_IP_BYTE_LEN_HIGH]*256U+buffer[SOCKET_IP_BYTE_LEN_LOW])
375  {
377  {
378  tun_fifo.push(buffer);
379  tun_fifo_check.notify_one();
380  printf("Buffer: [%u,%" PRIu64 "] %f\n", rxr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastin));
381  lastin = currentmjd(0.);
382  // for (uint16_t i=0; i<(rxr_handle.frame.size<150?rxr_handle.frame.size:150); ++i)
383  // {
384  // if (i == 17)
385  // {
386  // printf("[ ");
387  // }
388  // printf("%x ", rxr_handle.frame.full[i]);
389  // if (i == buffer.size()+16 || i == 149)
390  // {
391  // printf("] ");
392  // }
393  // }
394  // printf("\n");
395  fflush(stdout);
396  }
397  else
398  {
399  printf("UDP checksum error:\n");
400  }
401  }
402  else
403  {
404  printf("Beacon: [%d,%u,%" PRIu64 "] %f\n", iretn, rxr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastbeacon));
405  lastbeacon = currentmjd(0.);
406  // std::string str(buffer.begin(), buffer.end());
407  // std::cout << "\t" << str << std::endl;
408  }
409  } // End of mutex for tun FIFO
410  }
411  tun_fifo_check.notify_all();
412 
414 }
kpc9612p_handle rxr_handle
Definition: agent_kpc9612p.cpp:72
#define SOCKET_IP_BYTE_LEN_LOW
Definition: socketlib.h:95
#define SOCKET_IP_BYTE_LEN_HIGH
Definition: socketlib.h:96
int iretn
Definition: rw_test.cpp:37
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t kpc9612p_recvframe(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:111
static char buffer[255]
Definition: propagator_simple.cpp:60
int32_t kpc9612p_unloadframe(kpc9612p_handle *handle, uint8_t *data, uint16_t size)
Definition: kpc9612p_lib.cpp:192
kpc9612p_frame frame
Definition: kpc9612p_lib.h:91
int32_t socket_check_udp_checksum(vector< uint8_t > packet)
Check UDP checksum.
Definition: socketlib.cpp:407
#define SOCKET_IP_BYTE_PROTOCOL
Definition: socketlib.h:97
Agent * agent
Definition: agent_kpc9612p.cpp:49
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
std::condition_variable tun_fifo_check
Definition: agent_kpc9612p.cpp:63
#define SOCKET_IP_PROTOCOL_UDP
Definition: socketlib.h:103
int32_t kpc9612p_disconnect(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:103
#define SOCKET_IP_BYTE_VERSION
IP Version Byte.
Definition: socketlib.h:94
uint16_t size
Definition: kpc9612p_lib.h:85
std::queue< std::vector< uint8_t > > tun_fifo
Definition: agent_kpc9612p.cpp:57
void tcv_write_loop ( )
417 {
418  std::vector<uint8_t> buffer;
419  // int32_t iretn;
420  // double lastout = currentmjd(0.);
421 
422  while (agent->running())
423  {
424  { // Start of mutex for tcv FIFO
425  std::unique_lock<std::mutex> locker(tcv_fifo_lock);
426 
427  while (tcv_fifo.empty())
428  {
429  tcv_fifo_check.wait(locker);
430  }
431  } // End of mutex for tcv FIFO
432 
433  while (!tcv_fifo.empty())
434  {
435  // Get next packet from transceiver FIFO
436  buffer = tcv_fifo.front();
437  tcv_fifo.pop();
438  // Write data to transmitter port
439  kpc9612p_loadframe(&txr_handle, buffer);
441  // printf("Out Radio: [%d,%u,%u] %f\n", iretn, txr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastout));
442  // lastout = currentmjd(0.);
443  // if (iretn >= 0)
444  // {
445  // for (uint16_t i=0; i<(txr_handle.frame.size<150?txr_handle.frame.size:150); ++i)
446  // {
447  // if (i == 17)
448  // {
449  // printf("[ ");
450  // }
451  // if (i == buffer.size()+17)
452  // {
453  // printf("] ");
454  // }
455  // printf("%x ", txr_handle.frame.full[i]);
456  // }
457  // printf("\n");
458  // }
459  }
460  }
461 
463 }
std::condition_variable tcv_fifo_check
Definition: agent_kpc9612p.cpp:61
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static char buffer[255]
Definition: propagator_simple.cpp:60
Agent * agent
Definition: agent_kpc9612p.cpp:49
std::mutex tcv_fifo_lock
Definition: agent_kpc9612p.cpp:60
int32_t kpc9612p_disconnect(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:103
int32_t kpc9612p_sendframe(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:130
kpc9612p_handle txr_handle
Definition: agent_kpc9612p.cpp:73
std::queue< std::vector< uint8_t > > tcv_fifo
Definition: agent_kpc9612p.cpp:58
int32_t kpc9612p_loadframe(kpc9612p_handle *handle, uint8_t *data, uint16_t size)
Definition: kpc9612p_lib.cpp:140
void tun_read_loop ( )
251 {
252  std::vector<uint8_t> buffer;
253  int32_t nbytes;
254 
255  while (agent->running())
256  {
257 
258  buffer.resize(TUN_BUF_SIZE);
259  nbytes = read(tun_fd, &buffer[0], TUN_BUF_SIZE);
260  if (nbytes > 0)
261  {
262  buffer.resize(nbytes);
263 
264  // Add UDP checksum if necessary
266  {
267  socket_set_udp_checksum(buffer);
268  }
269 
270  tcv_fifo.push(buffer);
271  tcv_fifo_check.notify_one();
272  }
273  }
274  tcv_fifo_check.notify_all();
275 }
int tun_fd
Definition: agent_kpc9612p.cpp:65
int32_t socket_set_udp_checksum(vector< uint8_t > &packet)
Set UDP checksum.
Definition: socketlib.cpp:428
std::condition_variable tcv_fifo_check
Definition: agent_kpc9612p.cpp:61
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static char buffer[255]
Definition: propagator_simple.cpp:60
#define SOCKET_IP_BYTE_PROTOCOL
Definition: socketlib.h:97
Agent * agent
Definition: agent_kpc9612p.cpp:49
#define SOCKET_IP_PROTOCOL_UDP
Definition: socketlib.h:103
#define TUN_BUF_SIZE
Definition: agent_kpc9612p.cpp:78
std::queue< std::vector< uint8_t > > tcv_fifo
Definition: agent_kpc9612p.cpp:58
void tun_write_loop ( )
278 {
279  std::vector<uint8_t> buffer;
280  int32_t nbytes;
281 
282  while (agent->running())
283  {
284  { // Start of mutex for tun fifo
285  std::unique_lock<std::mutex> locker(tun_fifo_lock);
286 
287  while (tun_fifo.empty())
288  {
289  tun_fifo_check.wait(locker);
290  } // End of mutex for tun fifo
291  }
292 
293  while (!tun_fifo.empty())
294  {
295  buffer = tun_fifo.front();
296  nbytes = agent->post(Agent::AgentMessage::COMM, buffer);
297  if (open_tunnel)
298  {
299  nbytes = write(tun_fd, &buffer[0], buffer.size());
300  }
301  if (nbytes > 0)
302  {
303  // printf("Out TUN: [%u,%u] ", nbytes, buffer.size());
304  // for (uint16_t i=0; i<(buffer.size()<150?buffer.size():150); ++i)
305  // {
306  // if (i == 0)
307  // {
308  // printf("[ ");
309  // }
310  // printf("%x ", buffer[i]);
311  // if (i+1 == buffer.size() || i == 149)
312  // {
313  // printf("] ");
314  // }
315  // }
316  // printf("\n");
317  tun_fifo.pop();
318  }
319  }
320  }
321 }
int tun_fd
Definition: agent_kpc9612p.cpp:65
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t post(messstruc mess)
Post a Cosmos::Agent::messstruc.
Definition: agentclass.cpp:2074
bool open_tunnel
Definition: agent_kpc9612p.cpp:66
static char buffer[255]
Definition: propagator_simple.cpp:60
Agent * agent
Definition: agent_kpc9612p.cpp:49
std::condition_variable tun_fifo_check
Definition: agent_kpc9612p.cpp:63
std::mutex tun_fifo_lock
Definition: agent_kpc9612p.cpp:62
std::queue< std::vector< uint8_t > > tun_fifo
Definition: agent_kpc9612p.cpp:57
int main ( int  argc,
char *  argv[] 
)
69 {
70 #if defined(COSMOS_LINUX_OS)
71 
72  int32_t iretn;
73  char tunnel_ip[20];
74  vector<uint8_t> buffer;
75 
76  switch (argc)
77  {
78  case 4:
79  {
80  rxr_devname = argv[3];
81  size_t tloc = rxr_devname.find(":");
82  if (tloc != string::npos)
83  {
84  rxr_baud = atol(rxr_devname.substr(tloc+1, rxr_devname.size()-(tloc+1)).c_str());
85  rxr_devname = rxr_devname.substr(0, tloc);
86  }
87  // Open receiver port
88  rxr_serial = new Serial(rxr_devname, rxr_baud);
89  if (rxr_serial->get_error() < 0)
90  {
91  printf("Error opening %s as receiver - %s\n",rxr_devname.c_str(), cosmos_error_string(rxr_serial->get_error()).c_str());
92  exit (-1);
93  }
94  }
95  case 3:
96  {
97  txr_devname = argv[2];
98  size_t tloc = txr_devname.find(":");
99  if (tloc != string::npos)
100  {
101  txr_baud = atol(txr_devname.substr(tloc+1, txr_devname.size()-tloc+1).c_str());
102  txr_devname = txr_devname.substr(0, tloc);
103  }
104  // Open receiver port
105  txr_serial = new Serial(txr_devname, txr_baud);
106  txr_serial->set_timeout(1.);
107  if (txr_serial->get_error() < 0)
108  {
109  printf("Error opening %s as receiver - %s\n",txr_devname.c_str(), cosmos_error_string(txr_serial->get_error()).c_str());
110  exit (-1);
111  }
112  // Copy transmitter to receiver port if not already open (Duplex)
113  if (rxr_serial == nullptr)
115  // Get address for tunnel
116  strcpy(tunnel_ip,argv[1]);
117  }
118  break;
119  default:
120  printf("Usage: agent_tunnel ip_address transmit_device [receive_device]\n");
121  exit (-1);
122  }
123 
124  // Initialize the Agent
125  agent = new Agent("", "tunnel", 1., MAXBUFFERSIZE, true);
126  if ((iretn = agent->wait()) < 0)
127  {
128  fprintf(agent->get_debug_fd(), "%16.10f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
129  exit(iretn);
130  }
131  else
132  {
133  fprintf(agent->get_debug_fd(), "%16.10f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
134  }
135 
136 
137  // Start serial threads
138  thread tcv_read_thread(tcv_read_loop);
139  thread tcv_write_thread(tcv_write_loop);
140 
141  // Open tunnel device
142  int tunnel_sock;
143  struct ifreq ifr1, ifr2;
144  struct sockaddr_in *addr = (struct sockaddr_in *)&ifr2.ifr_addr;
145 
146  if ((tun_fd=open("/dev/net/tun", O_RDWR)) < 0)
147  {
148  perror("Error opening tunnel device");
149  exit (-1);
150  }
151 
152  memset(&ifr1, 0, sizeof(ifr1));
153  ifr1.ifr_flags = IFF_TUN | IFF_NO_PI;
154  strncpy(ifr1.ifr_name, agent->cinfo->agent[0].beat.proc, IFNAMSIZ);
155  if (ioctl(tun_fd, TUNSETIFF, static_cast<void *>(&ifr1)) < 0)
156  {
157  perror("Error setting tunnel interface");
158  exit (-1);
159  }
160 
161  if((tunnel_sock=socket(AF_INET, SOCK_DGRAM, IPPROTO_IP)) < 0)
162  {
163  perror("Error opening tunnel socket");
164  exit (-1);
165  }
166 
167  // Get ready to set things
168  strncpy(ifr2.ifr_name, agent->cinfo->agent[0].beat.proc, IFNAMSIZ);
169  ifr2.ifr_addr.sa_family = AF_INET;
170 
171  // Set interface address
172 
173  inet_pton(AF_INET, tunnel_ip, &addr->sin_addr);
174  if (ioctl(tunnel_sock, SIOCSIFADDR, &ifr2) < 0 )
175  {
176  perror("Error setting tunnel address");
177  exit (-1);
178  }
179 
180  // Set interface netmask
181  inet_pton(AF_INET, static_cast<const char *>("255.255.255.0"), &addr->sin_addr);
182  if (ioctl(tunnel_sock, SIOCSIFNETMASK, &ifr2) < 0 )
183  {
184  perror("Error setting tunnel netmask");
185  exit (-1);
186  }
187 
188  if (ioctl(tunnel_sock, SIOCGIFFLAGS, &ifr2) < 0 )
189  {
190  perror("Error getting tunnel interface flags");
191  exit (-1);
192  }
193 
194  // Bring interface up
195  ifr2.ifr_flags |= (IFF_UP | IFF_RUNNING);
196  if (ioctl(tunnel_sock, SIOCSIFFLAGS, &ifr2) < 0 )
197  {
198  perror("Error setting tunnel interface flags");
199  exit (-1);
200  }
201 
202  // Set interface MTU
203  ifr2.ifr_mtu = 250;
204  if (ioctl(tunnel_sock, SIOCSIFMTU, &ifr2) < 0 )
205  {
206  perror("Error setting tunnel interface MTU");
207  exit (-1);
208  }
209 
210 
211  close(tunnel_sock);
212 
213  // Start tunnel threads
214  thread tun_read_thread(tun_read_loop);
215  thread tun_write_thread(tun_write_loop);
216 
217  double nmjd = currentmjd(0.);
218  int32_t sleept;
219 
220  // Start performing the body of the agent
221  while(agent->running())
222  {
223  // Set beginning of next cycle;
224  nmjd += agent->cinfo->agent[0].aprd/86400.;
225 
226  sleept = (int32_t)((nmjd - currentmjd(0.))*86400000000.);
227  if (sleept < 0)
228  {
229  sleept = 0;
230  }
231  COSMOS_USLEEP(sleept);
232  }
233 #endif
234  exit (0);
235 }
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
static uint32_t txr_baud
Definition: agent_tunnel.cpp:63
Definition: serialclass.h:43
int iretn
Definition: rw_test.cpp:37
#define MAXBUFFERSIZE
Definition: agent_tunnel.cpp:65
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
static string txr_devname
Definition: agent_tunnel.cpp:59
static int tun_fd
Definition: agent_tunnel.cpp:56
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
void tun_write_loop()
Definition: agent_kpc9612p.cpp:277
static string rxr_devname
Definition: agent_tunnel.cpp:58
static char buffer[255]
Definition: propagator_simple.cpp:60
string getAgent()
Definition: agentclass.cpp:2609
Definition: agentclass.h:139
static Serial * rxr_serial
Definition: agent_tunnel.cpp:60
static Agent * agent
Definition: agent_tunnel.cpp:43
double data_ctime(string path)
Definition: datalib.cpp:1910
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
int32_t set_timeout(int, double timeout)
Definition: serialclass.cpp:515
vector< agentstruc > agent
Single entry vector for agent information.
Definition: jsondef.h:4247
static Serial * txr_serial
Definition: agent_tunnel.cpp:61
void tun_read_loop()
Definition: agent_kpc9612p.cpp:250
cosmosstruc * cinfo
Definition: agentclass.h:346
void tcv_write_loop()
Definition: agent_kpc9612p.cpp:416
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t get_error()
Definition: serialclass.cpp:170
static uint32_t rxr_baud
Definition: agent_tunnel.cpp:62
void tcv_read_loop()
Definition: agent_kpc9612p.cpp:358

Variable Documentation

Agent* agent
static
std::queue<vector<uint8_t> > tun_fifo
static
std::queue<vector<uint8_t> > tcv_fifo
static
std::condition_variable tcv_fifo_check
static
std::condition_variable tun_fifo_check
static
int tun_fd
static
string rxr_devname
static
string txr_devname
static
Serial* rxr_serial =nullptr
static
Serial* txr_serial =nullptr
static
uint32_t rxr_baud = 9600
static
uint32_t txr_baud = 9600
static