COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_tunnel2.cpp File Reference
#include "support/configCosmos.h"
#include <stdio.h>
#include "agent/agentclass.h"
#include "device/serial/serialclass.h"
Include dependency graph for agent_tunnel2.cpp:

Macros

#define MAXBUFFERSIZE   2560
 
#define TUN_BUF_SIZE   2000
 

Functions

void tcv_read_loop ()
 
void tcv_write_loop ()
 
void tun_read_loop ()
 
void tun_write_loop ()
 
int main (int argc, char *argv[])
 

Variables

static Agentagent
 
static std::queue< vector< uint8_t > > tun_fifo
 
static std::queue< vector< uint8_t > > tcv_fifo
 
static std::condition_variable tcv_fifo_check
 
static std::condition_variable tun_fifo_check
 
static int tun_fd
 
static uint16_t tun_mtu =250
 
static ElapsedTime tun_et
 
static ElapsedTime tcv_et
 
static string rxr_devname
 
static string txr_devname
 
static Serialrxr_serial =nullptr
 
static Serialtxr_serial =nullptr
 
static uint32_t rxr_baud = 9600
 
static uint32_t txr_baud = 9600
 

Macro Definition Documentation

#define MAXBUFFERSIZE   2560
#define TUN_BUF_SIZE   2000

Function Documentation

void tcv_read_loop ( )
359 {
360  std::vector<uint8_t> buffer;
361  int32_t iretn;
362  double lastin = currentmjd(0.);
363  double lastbeacon = currentmjd(0.);
364 
365  while (agent->running())
366  {
367  // Read data from receiver port
368  iretn = kpc9612p_recvframe(&rxr_handle);
369  if (iretn > 0)
370  { // Start of mutex for tun FIFO
372  // uint16_t cs1 = kpc9612p_calc_fcs(&buffer[17], buffer.size()-17);
373  // uint16_t cs2 = *(uint16_t*)(astroin->payload+astroin->header.size-4);
374  if (buffer[SOCKET_IP_BYTE_VERSION]>>4 == 4 && buffer.size() == buffer[SOCKET_IP_BYTE_LEN_HIGH]*256U+buffer[SOCKET_IP_BYTE_LEN_LOW])
375  {
377  {
378  tun_fifo.push(buffer);
379  tun_fifo_check.notify_one();
380  printf("Buffer: [%u,%" PRIu64 "] %f\n", rxr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastin));
381  lastin = currentmjd(0.);
382  // for (uint16_t i=0; i<(rxr_handle.frame.size<150?rxr_handle.frame.size:150); ++i)
383  // {
384  // if (i == 17)
385  // {
386  // printf("[ ");
387  // }
388  // printf("%x ", rxr_handle.frame.full[i]);
389  // if (i == buffer.size()+16 || i == 149)
390  // {
391  // printf("] ");
392  // }
393  // }
394  // printf("\n");
395  fflush(stdout);
396  }
397  else
398  {
399  printf("UDP checksum error:\n");
400  }
401  }
402  else
403  {
404  printf("Beacon: [%d,%u,%" PRIu64 "] %f\n", iretn, rxr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastbeacon));
405  lastbeacon = currentmjd(0.);
406  // std::string str(buffer.begin(), buffer.end());
407  // std::cout << "\t" << str << std::endl;
408  }
409  } // End of mutex for tun FIFO
410  }
411  tun_fifo_check.notify_all();
412 
414 }
kpc9612p_handle rxr_handle
Definition: agent_kpc9612p.cpp:72
#define SOCKET_IP_BYTE_LEN_LOW
Definition: socketlib.h:95
#define SOCKET_IP_BYTE_LEN_HIGH
Definition: socketlib.h:96
int iretn
Definition: rw_test.cpp:37
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t kpc9612p_recvframe(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:111
static char buffer[255]
Definition: propagator_simple.cpp:60
int32_t kpc9612p_unloadframe(kpc9612p_handle *handle, uint8_t *data, uint16_t size)
Definition: kpc9612p_lib.cpp:192
kpc9612p_frame frame
Definition: kpc9612p_lib.h:91
int32_t socket_check_udp_checksum(vector< uint8_t > packet)
Check UDP checksum.
Definition: socketlib.cpp:407
#define SOCKET_IP_BYTE_PROTOCOL
Definition: socketlib.h:97
Agent * agent
Definition: agent_kpc9612p.cpp:49
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
std::condition_variable tun_fifo_check
Definition: agent_kpc9612p.cpp:63
#define SOCKET_IP_PROTOCOL_UDP
Definition: socketlib.h:103
int32_t kpc9612p_disconnect(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:103
#define SOCKET_IP_BYTE_VERSION
IP Version Byte.
Definition: socketlib.h:94
uint16_t size
Definition: kpc9612p_lib.h:85
std::queue< std::vector< uint8_t > > tun_fifo
Definition: agent_kpc9612p.cpp:57
void tcv_write_loop ( )
417 {
418  std::vector<uint8_t> buffer;
419  // int32_t iretn;
420  // double lastout = currentmjd(0.);
421 
422  while (agent->running())
423  {
424  { // Start of mutex for tcv FIFO
425  std::unique_lock<std::mutex> locker(tcv_fifo_lock);
426 
427  while (tcv_fifo.empty())
428  {
429  tcv_fifo_check.wait(locker);
430  }
431  } // End of mutex for tcv FIFO
432 
433  while (!tcv_fifo.empty())
434  {
435  // Get next packet from transceiver FIFO
436  buffer = tcv_fifo.front();
437  tcv_fifo.pop();
438  // Write data to transmitter port
439  kpc9612p_loadframe(&txr_handle, buffer);
441  // printf("Out Radio: [%d,%u,%u] %f\n", iretn, txr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastout));
442  // lastout = currentmjd(0.);
443  // if (iretn >= 0)
444  // {
445  // for (uint16_t i=0; i<(txr_handle.frame.size<150?txr_handle.frame.size:150); ++i)
446  // {
447  // if (i == 17)
448  // {
449  // printf("[ ");
450  // }
451  // if (i == buffer.size()+17)
452  // {
453  // printf("] ");
454  // }
455  // printf("%x ", txr_handle.frame.full[i]);
456  // }
457  // printf("\n");
458  // }
459  }
460  }
461 
463 }
std::condition_variable tcv_fifo_check
Definition: agent_kpc9612p.cpp:61
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static char buffer[255]
Definition: propagator_simple.cpp:60
Agent * agent
Definition: agent_kpc9612p.cpp:49
std::mutex tcv_fifo_lock
Definition: agent_kpc9612p.cpp:60
int32_t kpc9612p_disconnect(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:103
int32_t kpc9612p_sendframe(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:130
kpc9612p_handle txr_handle
Definition: agent_kpc9612p.cpp:73
std::queue< std::vector< uint8_t > > tcv_fifo
Definition: agent_kpc9612p.cpp:58
int32_t kpc9612p_loadframe(kpc9612p_handle *handle, uint8_t *data, uint16_t size)
Definition: kpc9612p_lib.cpp:140
void tun_read_loop ( )
251 {
252  std::vector<uint8_t> buffer;
253  int32_t nbytes;
254 
255  while (agent->running())
256  {
257 
258  buffer.resize(TUN_BUF_SIZE);
259  nbytes = read(tun_fd, &buffer[0], TUN_BUF_SIZE);
260  if (nbytes > 0)
261  {
262  buffer.resize(nbytes);
263 
264  // Add UDP checksum if necessary
266  {
267  socket_set_udp_checksum(buffer);
268  }
269 
270  tcv_fifo.push(buffer);
271  tcv_fifo_check.notify_one();
272  }
273  }
274  tcv_fifo_check.notify_all();
275 }
int tun_fd
Definition: agent_kpc9612p.cpp:65
int32_t socket_set_udp_checksum(vector< uint8_t > &packet)
Set UDP checksum.
Definition: socketlib.cpp:428
std::condition_variable tcv_fifo_check
Definition: agent_kpc9612p.cpp:61
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static char buffer[255]
Definition: propagator_simple.cpp:60
#define SOCKET_IP_BYTE_PROTOCOL
Definition: socketlib.h:97
Agent * agent
Definition: agent_kpc9612p.cpp:49
#define SOCKET_IP_PROTOCOL_UDP
Definition: socketlib.h:103
#define TUN_BUF_SIZE
Definition: agent_kpc9612p.cpp:78
std::queue< std::vector< uint8_t > > tcv_fifo
Definition: agent_kpc9612p.cpp:58
void tun_write_loop ( )
278 {
279  std::vector<uint8_t> buffer;
280  int32_t nbytes;
281 
282  while (agent->running())
283  {
284  { // Start of mutex for tun fifo
285  std::unique_lock<std::mutex> locker(tun_fifo_lock);
286 
287  while (tun_fifo.empty())
288  {
289  tun_fifo_check.wait(locker);
290  } // End of mutex for tun fifo
291  }
292 
293  while (!tun_fifo.empty())
294  {
295  buffer = tun_fifo.front();
296  nbytes = agent->post(Agent::AgentMessage::COMM, buffer);
297  if (open_tunnel)
298  {
299  nbytes = write(tun_fd, &buffer[0], buffer.size());
300  }
301  if (nbytes > 0)
302  {
303  // printf("Out TUN: [%u,%u] ", nbytes, buffer.size());
304  // for (uint16_t i=0; i<(buffer.size()<150?buffer.size():150); ++i)
305  // {
306  // if (i == 0)
307  // {
308  // printf("[ ");
309  // }
310  // printf("%x ", buffer[i]);
311  // if (i+1 == buffer.size() || i == 149)
312  // {
313  // printf("] ");
314  // }
315  // }
316  // printf("\n");
317  tun_fifo.pop();
318  }
319  }
320  }
321 }
int tun_fd
Definition: agent_kpc9612p.cpp:65
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t post(messstruc mess)
Post a Cosmos::Agent::messstruc.
Definition: agentclass.cpp:2074
bool open_tunnel
Definition: agent_kpc9612p.cpp:66
static char buffer[255]
Definition: propagator_simple.cpp:60
Agent * agent
Definition: agent_kpc9612p.cpp:49
std::condition_variable tun_fifo_check
Definition: agent_kpc9612p.cpp:63
std::mutex tun_fifo_lock
Definition: agent_kpc9612p.cpp:62
std::queue< std::vector< uint8_t > > tun_fifo
Definition: agent_kpc9612p.cpp:57
int main ( int  argc,
char *  argv[] 
)
72 {
73 #if defined(COSMOS_LINUX_OS)
74 
75  int32_t iretn;
76  char tunnel_ip[20];
77  vector<uint8_t> buffer;
78 
79  switch (argc)
80  {
81  case 5:
82  {
83  tun_mtu = atoi(argv[4]);
84  }
85  case 4:
86  {
87  // Get unique receiver device name, if any
88  rxr_devname = argv[3];
89  size_t tloc = rxr_devname.find(":");
90  if (tloc != string::npos)
91  {
92  rxr_baud = atol(rxr_devname.substr(tloc+1, rxr_devname.size()-(tloc+1)).c_str());
93  rxr_devname = rxr_devname.substr(0, tloc);
94  }
95  }
96  case 3:
97  {
98  // Get unique transmitter device name, copy to receiver if not unique
99  txr_devname = argv[2];
100  size_t tloc = txr_devname.find(":");
101  if (tloc != string::npos)
102  {
103  txr_baud = atol(txr_devname.substr(tloc+1, txr_devname.size()-tloc+1).c_str());
104  txr_devname = txr_devname.substr(0, tloc);
105  }
106  // Create serial devices
107  txr_serial = new Serial(txr_devname, txr_baud);
108  if (rxr_devname.empty())
109  {
110  rxr_baud = txr_baud;
113  }
114  else
115  {
117  }
118 
119  // Get address for tunnel
120  strcpy(tunnel_ip,argv[1]);
121  }
122  break;
123  default:
124  printf("Usage: agent_tunnel ip_address transmit_device [receive_device]\n");
125  exit (-1);
126  }
127 
128  // Initialize the Agent
129  agent = new Agent("", "tunnel", 1., MAXBUFFERSIZE, true);
130  if ((iretn = agent->wait()) < 0)
131  {
132  fprintf(agent->get_debug_fd(), "%16.10f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
133  exit(iretn);
134  }
135  else
136  {
137  fprintf(agent->get_debug_fd(), "%16.10f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
138  }
139 
140 
141  // Start serial threads
142  thread tcv_read_thread(tcv_read_loop);
143  thread tcv_write_thread(tcv_write_loop);
144 
145  // Open tunnel device
146  int tunnel_sock;
147  struct ifreq ifr1, ifr2;
148  struct sockaddr_in *addr = (struct sockaddr_in *)&ifr2.ifr_addr;
149 
150  if ((tun_fd=open("/dev/net/tun", O_RDWR)) < 0)
151  {
152  perror("Error opening tunnel device");
153  exit (-1);
154  }
155 
156  memset(&ifr1, 0, sizeof(ifr1));
157  ifr1.ifr_flags = IFF_TUN | IFF_NO_PI;
158  strncpy(ifr1.ifr_name, agent->cinfo->agent[0].beat.proc, IFNAMSIZ);
159  if (ioctl(tun_fd, TUNSETIFF, static_cast<void *>(&ifr1)) < 0)
160  {
161  perror("Error setting tunnel interface");
162  exit (-1);
163  }
164 
165  if((tunnel_sock=socket(AF_INET, SOCK_DGRAM, IPPROTO_IP)) < 0)
166  {
167  perror("Error opening tunnel socket");
168  exit (-1);
169  }
170 
171  // Get ready to set things
172  strncpy(ifr2.ifr_name, agent->cinfo->agent[0].beat.proc, IFNAMSIZ);
173  ifr2.ifr_addr.sa_family = AF_INET;
174 
175  // Set interface address
176 
177  inet_pton(AF_INET, tunnel_ip, &addr->sin_addr);
178  if (ioctl(tunnel_sock, SIOCSIFADDR, &ifr2) < 0 )
179  {
180  perror("Error setting tunnel address");
181  exit (-1);
182  }
183 
184  // Set interface netmask
185  inet_pton(AF_INET, static_cast<const char *>("255.255.255.0"), &addr->sin_addr);
186  if (ioctl(tunnel_sock, SIOCSIFNETMASK, &ifr2) < 0 )
187  {
188  perror("Error setting tunnel netmask");
189  exit (-1);
190  }
191 
192  if (ioctl(tunnel_sock, SIOCGIFFLAGS, &ifr2) < 0 )
193  {
194  perror("Error getting tunnel interface flags");
195  exit (-1);
196  }
197 
198  // Bring interface up
199  ifr2.ifr_flags |= (IFF_UP | IFF_RUNNING);
200  if (ioctl(tunnel_sock, SIOCSIFFLAGS, &ifr2) < 0 )
201  {
202  perror("Error setting tunnel interface flags");
203  exit (-1);
204  }
205 
206  // Set interface MTU
207  ifr2.ifr_mtu = tun_mtu;
208  if (ioctl(tunnel_sock, SIOCSIFMTU, &ifr2) < 0 )
209  {
210  perror("Error setting tunnel interface MTU");
211  exit (-1);
212  }
213 
214 
215  close(tunnel_sock);
216 
217  // Start tunnel threads
218  thread tun_read_thread(tun_read_loop);
219  thread tun_write_thread(tun_write_loop);
220 
221  double nmjd = currentmjd(0.);
222  int32_t sleept;
223 
224  // Start performing the body of the agent
225  while(agent->running())
226  {
227  // Set beginning of next cycle;
228  nmjd += agent->cinfo->agent[0].aprd/86400.;
229 
230  sleept = (int32_t)((nmjd - currentmjd(0.))*86400000000.);
231  if (sleept < 0)
232  {
233  sleept = 0;
234  }
235  COSMOS_USLEEP(sleept);
236  }
237 #endif
238  exit (0);
239 }
static uint32_t txr_baud
Definition: agent_tunnel2.cpp:66
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
void tun_write_loop()
Definition: agent_kpc9612p.cpp:277
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
Definition: serialclass.h:43
static Serial * rxr_serial
Definition: agent_tunnel2.cpp:63
int iretn
Definition: rw_test.cpp:37
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
void tun_read_loop()
Definition: agent_kpc9612p.cpp:250
static uint16_t tun_mtu
Definition: agent_tunnel2.cpp:57
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static string rxr_devname
Definition: agent_tunnel2.cpp:61
static int tun_fd
Definition: agent_tunnel2.cpp:56
static char buffer[255]
Definition: propagator_simple.cpp:60
string getAgent()
Definition: agentclass.cpp:2609
Definition: agentclass.h:139
static uint32_t rxr_baud
Definition: agent_tunnel2.cpp:65
static Agent * agent
Definition: agent_tunnel2.cpp:43
double data_ctime(string path)
Definition: datalib.cpp:1910
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static Serial * txr_serial
Definition: agent_tunnel2.cpp:64
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
void tcv_write_loop()
Definition: agent_kpc9612p.cpp:416
vector< agentstruc > agent
Single entry vector for agent information.
Definition: jsondef.h:4247
static string txr_devname
Definition: agent_tunnel2.cpp:62
cosmosstruc * cinfo
Definition: agentclass.h:346
#define MAXBUFFERSIZE
Definition: agent_tunnel2.cpp:68
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
void tcv_read_loop()
Definition: agent_kpc9612p.cpp:358

Variable Documentation

Agent* agent
static
std::queue<vector<uint8_t> > tun_fifo
static
std::queue<vector<uint8_t> > tcv_fifo
static
std::condition_variable tcv_fifo_check
static
std::condition_variable tun_fifo_check
static
int tun_fd
static
uint16_t tun_mtu =250
static
ElapsedTime tun_et
static
ElapsedTime tcv_et
static
string rxr_devname
static
string txr_devname
static
Serial* rxr_serial =nullptr
static
Serial* txr_serial =nullptr
static
uint32_t rxr_baud = 9600
static
uint32_t txr_baud = 9600
static