COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_kpc9612p.cpp File Reference
#include "support/configCosmos.h"
#include "support/socketlib.h"
#include <stdio.h>
#include "agent/agentclass.h"
#include "device/general/cssl_lib.h"
#include "device/general/kpc9612p_lib.h"
Include dependency graph for agent_kpc9612p.cpp:

Macros

#define MAXBUFFERSIZE   2560
 
#define TUN_BUF_SIZE   2000
 
#define BAUD   19200
 

Functions

void port_read_loop ()
 
void tcv_read_loop ()
 
void tcv_write_loop ()
 
void tun_read_loop ()
 
void tun_write_loop ()
 
int32_t request_transmit_port (string &req, string &response, Agent *)
 
int main (int argc, char *argv[])
 

Variables

char agentname [COSMOS_MAX_NAME+1] = "tunnel"
 
char node [50] = ""
 
int waitsec = 5
 
Agentagent
 
std::queue< std::vector< uint8_t > > tun_fifo
 
std::queue< std::vector< uint8_t > > tcv_fifo
 
std::mutex tcv_fifo_lock
 
std::condition_variable tcv_fifo_check
 
std::mutex tun_fifo_lock
 
std::condition_variable tun_fifo_check
 
int tun_fd
 
bool open_tunnel = true
 
char rxr_devname [20] =""
 
char txr_devname [20] =""
 
kpc9612p_handle rxr_handle
 
kpc9612p_handle txr_handle
 
socket_channel transmit_socket
 

Macro Definition Documentation

#define MAXBUFFERSIZE   2560
#define TUN_BUF_SIZE   2000
#define BAUD   19200

Function Documentation

void port_read_loop ( )
324 {
325  std::vector<uint8_t> buffer;
326  int32_t iretn;
327  int32_t nbytes;
328  double lastin = currentmjd(0.);
329 
330  if ((iretn = socket_open(&transmit_socket, NetworkType::UDP, "", 0, SOCKET_LISTEN, SOCKET_BLOCKING,5000000)) < 0)
331  {
332  printf("- Could not successfully open read port socket... exiting \n");
333  exit (-errno);
334  }
335 
336  while (agent->running())
337  {
338  std::vector<uint8_t> buf;
339 
340  // Read data from receiver port
341  buf.resize(KPC9612P_PAYLOAD_SIZE);
342  if (( nbytes = recvfrom(transmit_socket.cudp, (char *)&buf[0], KPC9612P_PAYLOAD_SIZE, 0, static_cast<struct sockaddr *>(nullptr), static_cast<socklen_t *>(nullptr))) > 0)
343  {
344  buf.resize(nbytes);
345  }
346  if (nbytes > 0)
347  { // Start of mutex for tun FIFO
348  tun_fifo.push(buffer);
349  tun_fifo_check.notify_one();
350  printf("Buffer: [%u,%" PRIu64 "] %f\n", rxr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastin));
351  lastin = currentmjd(0.);
352  fflush(stdout);
353  } // End of mutex for tun FIFO
354  }
355  tun_fifo_check.notify_all();
356 }
kpc9612p_handle rxr_handle
Definition: agent_kpc9612p.cpp:72
Agent socket using Unicast UDP.
#define KPC9612P_PAYLOAD_SIZE
Definition: kpc9612p_lib.h:50
int iretn
Definition: rw_test.cpp:37
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t cudp
Definition: socketlib.h:120
static char buffer[255]
Definition: propagator_simple.cpp:60
kpc9612p_frame frame
Definition: kpc9612p_lib.h:91
Agent * agent
Definition: agent_kpc9612p.cpp:49
#define SOCKET_BLOCKING
Blocking Agent.
Definition: socketlib.h:78
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
std::condition_variable tun_fifo_check
Definition: agent_kpc9612p.cpp:63
#define SOCKET_LISTEN
Listen followed by optional talk (recvfrom INADDRANY)
Definition: socketlib.h:84
char buf[128]
Definition: rw_test.cpp:40
int32_t socket_open(socket_channel *channel, NetworkType ntype, const char *address, uint16_t port, uint16_t role, bool blocking, uint32_t usectimeo, uint32_t rcvbuf, uint32_t sndbuf)
Open UDP socket.
Definition: socketlib.cpp:51
uint16_t size
Definition: kpc9612p_lib.h:85
socket_channel transmit_socket
Definition: agent_kpc9612p.cpp:75
std::queue< std::vector< uint8_t > > tun_fifo
Definition: agent_kpc9612p.cpp:57
void tcv_read_loop ( )
359 {
360  std::vector<uint8_t> buffer;
361  int32_t iretn;
362  double lastin = currentmjd(0.);
363  double lastbeacon = currentmjd(0.);
364 
365  while (agent->running())
366  {
367  // Read data from receiver port
368  iretn = kpc9612p_recvframe(&rxr_handle);
369  if (iretn > 0)
370  { // Start of mutex for tun FIFO
372  // uint16_t cs1 = kpc9612p_calc_fcs(&buffer[17], buffer.size()-17);
373  // uint16_t cs2 = *(uint16_t*)(astroin->payload+astroin->header.size-4);
374  if (buffer[SOCKET_IP_BYTE_VERSION]>>4 == 4 && buffer.size() == buffer[SOCKET_IP_BYTE_LEN_HIGH]*256U+buffer[SOCKET_IP_BYTE_LEN_LOW])
375  {
377  {
378  tun_fifo.push(buffer);
379  tun_fifo_check.notify_one();
380  printf("Buffer: [%u,%" PRIu64 "] %f\n", rxr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastin));
381  lastin = currentmjd(0.);
382  // for (uint16_t i=0; i<(rxr_handle.frame.size<150?rxr_handle.frame.size:150); ++i)
383  // {
384  // if (i == 17)
385  // {
386  // printf("[ ");
387  // }
388  // printf("%x ", rxr_handle.frame.full[i]);
389  // if (i == buffer.size()+16 || i == 149)
390  // {
391  // printf("] ");
392  // }
393  // }
394  // printf("\n");
395  fflush(stdout);
396  }
397  else
398  {
399  printf("UDP checksum error:\n");
400  }
401  }
402  else
403  {
404  printf("Beacon: [%d,%u,%" PRIu64 "] %f\n", iretn, rxr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastbeacon));
405  lastbeacon = currentmjd(0.);
406  // std::string str(buffer.begin(), buffer.end());
407  // std::cout << "\t" << str << std::endl;
408  }
409  } // End of mutex for tun FIFO
410  }
411  tun_fifo_check.notify_all();
412 
414 }
kpc9612p_handle rxr_handle
Definition: agent_kpc9612p.cpp:72
#define SOCKET_IP_BYTE_LEN_LOW
Definition: socketlib.h:95
#define SOCKET_IP_BYTE_LEN_HIGH
Definition: socketlib.h:96
int iretn
Definition: rw_test.cpp:37
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t kpc9612p_recvframe(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:111
static char buffer[255]
Definition: propagator_simple.cpp:60
int32_t kpc9612p_unloadframe(kpc9612p_handle *handle, uint8_t *data, uint16_t size)
Definition: kpc9612p_lib.cpp:192
kpc9612p_frame frame
Definition: kpc9612p_lib.h:91
int32_t socket_check_udp_checksum(vector< uint8_t > packet)
Check UDP checksum.
Definition: socketlib.cpp:407
#define SOCKET_IP_BYTE_PROTOCOL
Definition: socketlib.h:97
Agent * agent
Definition: agent_kpc9612p.cpp:49
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
std::condition_variable tun_fifo_check
Definition: agent_kpc9612p.cpp:63
#define SOCKET_IP_PROTOCOL_UDP
Definition: socketlib.h:103
int32_t kpc9612p_disconnect(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:103
#define SOCKET_IP_BYTE_VERSION
IP Version Byte.
Definition: socketlib.h:94
uint16_t size
Definition: kpc9612p_lib.h:85
std::queue< std::vector< uint8_t > > tun_fifo
Definition: agent_kpc9612p.cpp:57
void tcv_write_loop ( )
417 {
418  std::vector<uint8_t> buffer;
419  // int32_t iretn;
420  // double lastout = currentmjd(0.);
421 
422  while (agent->running())
423  {
424  { // Start of mutex for tcv FIFO
425  std::unique_lock<std::mutex> locker(tcv_fifo_lock);
426 
427  while (tcv_fifo.empty())
428  {
429  tcv_fifo_check.wait(locker);
430  }
431  } // End of mutex for tcv FIFO
432 
433  while (!tcv_fifo.empty())
434  {
435  // Get next packet from transceiver FIFO
436  buffer = tcv_fifo.front();
437  tcv_fifo.pop();
438  // Write data to transmitter port
439  kpc9612p_loadframe(&txr_handle, buffer);
441  // printf("Out Radio: [%d,%u,%u] %f\n", iretn, txr_handle.frame.size, buffer.size(), 86400.*(currentmjd(0.)-lastout));
442  // lastout = currentmjd(0.);
443  // if (iretn >= 0)
444  // {
445  // for (uint16_t i=0; i<(txr_handle.frame.size<150?txr_handle.frame.size:150); ++i)
446  // {
447  // if (i == 17)
448  // {
449  // printf("[ ");
450  // }
451  // if (i == buffer.size()+17)
452  // {
453  // printf("] ");
454  // }
455  // printf("%x ", txr_handle.frame.full[i]);
456  // }
457  // printf("\n");
458  // }
459  }
460  }
461 
463 }
std::condition_variable tcv_fifo_check
Definition: agent_kpc9612p.cpp:61
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static char buffer[255]
Definition: propagator_simple.cpp:60
Agent * agent
Definition: agent_kpc9612p.cpp:49
std::mutex tcv_fifo_lock
Definition: agent_kpc9612p.cpp:60
int32_t kpc9612p_disconnect(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:103
int32_t kpc9612p_sendframe(kpc9612p_handle *handle)
Definition: kpc9612p_lib.cpp:130
kpc9612p_handle txr_handle
Definition: agent_kpc9612p.cpp:73
std::queue< std::vector< uint8_t > > tcv_fifo
Definition: agent_kpc9612p.cpp:58
int32_t kpc9612p_loadframe(kpc9612p_handle *handle, uint8_t *data, uint16_t size)
Definition: kpc9612p_lib.cpp:140
void tun_read_loop ( )
251 {
252  std::vector<uint8_t> buffer;
253  int32_t nbytes;
254 
255  while (agent->running())
256  {
257 
258  buffer.resize(TUN_BUF_SIZE);
259  nbytes = read(tun_fd, &buffer[0], TUN_BUF_SIZE);
260  if (nbytes > 0)
261  {
262  buffer.resize(nbytes);
263 
264  // Add UDP checksum if necessary
266  {
267  socket_set_udp_checksum(buffer);
268  }
269 
270  tcv_fifo.push(buffer);
271  tcv_fifo_check.notify_one();
272  }
273  }
274  tcv_fifo_check.notify_all();
275 }
int tun_fd
Definition: agent_kpc9612p.cpp:65
int32_t socket_set_udp_checksum(vector< uint8_t > &packet)
Set UDP checksum.
Definition: socketlib.cpp:428
std::condition_variable tcv_fifo_check
Definition: agent_kpc9612p.cpp:61
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static char buffer[255]
Definition: propagator_simple.cpp:60
#define SOCKET_IP_BYTE_PROTOCOL
Definition: socketlib.h:97
Agent * agent
Definition: agent_kpc9612p.cpp:49
#define SOCKET_IP_PROTOCOL_UDP
Definition: socketlib.h:103
#define TUN_BUF_SIZE
Definition: agent_kpc9612p.cpp:78
std::queue< std::vector< uint8_t > > tcv_fifo
Definition: agent_kpc9612p.cpp:58
void tun_write_loop ( )
278 {
279  std::vector<uint8_t> buffer;
280  int32_t nbytes;
281 
282  while (agent->running())
283  {
284  { // Start of mutex for tun fifo
285  std::unique_lock<std::mutex> locker(tun_fifo_lock);
286 
287  while (tun_fifo.empty())
288  {
289  tun_fifo_check.wait(locker);
290  } // End of mutex for tun fifo
291  }
292 
293  while (!tun_fifo.empty())
294  {
295  buffer = tun_fifo.front();
296  nbytes = agent->post(Agent::AgentMessage::COMM, buffer);
297  if (open_tunnel)
298  {
299  nbytes = write(tun_fd, &buffer[0], buffer.size());
300  }
301  if (nbytes > 0)
302  {
303  // printf("Out TUN: [%u,%u] ", nbytes, buffer.size());
304  // for (uint16_t i=0; i<(buffer.size()<150?buffer.size():150); ++i)
305  // {
306  // if (i == 0)
307  // {
308  // printf("[ ");
309  // }
310  // printf("%x ", buffer[i]);
311  // if (i+1 == buffer.size() || i == 149)
312  // {
313  // printf("] ");
314  // }
315  // }
316  // printf("\n");
317  tun_fifo.pop();
318  }
319  }
320  }
321 }
int tun_fd
Definition: agent_kpc9612p.cpp:65
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t post(messstruc mess)
Post a Cosmos::Agent::messstruc.
Definition: agentclass.cpp:2074
bool open_tunnel
Definition: agent_kpc9612p.cpp:66
static char buffer[255]
Definition: propagator_simple.cpp:60
Agent * agent
Definition: agent_kpc9612p.cpp:49
std::condition_variable tun_fifo_check
Definition: agent_kpc9612p.cpp:63
std::mutex tun_fifo_lock
Definition: agent_kpc9612p.cpp:62
std::queue< std::vector< uint8_t > > tun_fifo
Definition: agent_kpc9612p.cpp:57
int32_t request_transmit_port ( string &  req,
string &  response,
Agent  
)
466 {
467  response = to_unsigned(transmit_socket.cport);
468 
469  return 0;
470 }
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
uint16_t cport
Definition: socketlib.h:130
socket_channel transmit_socket
Definition: agent_kpc9612p.cpp:75
int main ( int  argc,
char *  argv[] 
)
84 {
85 
86  std::string tunnel_ip;
87  std::vector<uint8_t> buffer;
88  int32_t iretn;
89 
90  // cssl_start();
91 
92  switch (argc)
93  {
94  case 4:
95  strcpy(rxr_devname,argv[3]);
96  // Open receiver port
97  if ((iretn=kpc9612p_connect(rxr_devname, &rxr_handle, 0x00)) < 0)
98  {
99  printf("Error opening %s as receiver\n",rxr_devname);
100  exit (-1);
101  }
102  case 3:
103  strcpy(txr_devname,argv[2]);
104  // Open transmitter port
105  if ((iretn=kpc9612p_connect(txr_devname, &txr_handle, 0x00)) < 0)
106  {
107  printf("Error opening %s as transmitter\n",txr_devname);
108  exit (-1);
109  }
110  // Copy transmitter to receiver port if not already open (Duplex)
111  if (rxr_handle.serial == NULL)
113  // Get address for tunnel
114  tunnel_ip = argv[1];
115  break;
116  default:
117  printf("Usage: agent_tunnel ip_address transmit_device [receive_device]\n");
118  exit (-1);
119  break;
120  }
121 
122  // Initialize the Agent
123  if (!(agent = new Agent("", ("tunnel_"+tunnel_ip), 1., MAXBUFFERSIZE)))
125 
126  // If ip_address is our own, then don't form Tunnel interface.
127  for (size_t i=0; i<agent->cinfo->agent[0].ifcnt; i++)
128  {
129  if (agent->cinfo->agent[0].pub[i].address == tunnel_ip)
130  {
131  open_tunnel = false;
132  }
133  }
134 
135  // Add requests
136  if ((iretn=agent->add_request("request_transmit_port",request_transmit_port,"request_transmit_port", "Return the port to connect to to transmit data.")))
137  exit (iretn);
138 
139  // Start serial threads
140  thread port_read_thread(port_read_loop);
141  thread tcv_read_thread(tcv_read_loop);
142  thread tcv_write_thread(tcv_write_loop);
143 
144 #if defined(COSMOS_LINUX_OS)
145  if (open_tunnel)
146  {
147  // Open tunnel device
148  int tunnel_sock;
149  struct ifreq ifr1, ifr2;
150  struct sockaddr_in *addr = (struct sockaddr_in *)&ifr2.ifr_addr;
151 
152  if ((tun_fd=open("/dev/net/tun", O_RDWR)) < 0)
153  {
154  perror("Error opening tunnel device");
155  exit (-1);
156  }
157 
158  memset(&ifr1, 0, sizeof(ifr1));
159  ifr1.ifr_flags = IFF_TUN | IFF_NO_PI;
160  strncpy(ifr1.ifr_name, agent->cinfo->agent[0].beat.proc, IFNAMSIZ);
161  if (ioctl(tun_fd, TUNSETIFF, (void *)&ifr1) < 0)
162  {
163  perror("Error setting tunnel interface");
164  exit (-1);
165  }
166 
167  if((tunnel_sock=socket(AF_INET, SOCK_DGRAM, IPPROTO_IP)) < 0)
168  {
169  perror("Error opening tunnel socket");
170  exit (-1);
171  }
172 
173  // Get ready to set things
174  strncpy(ifr2.ifr_name, agent->cinfo->agent[0].beat.proc, IFNAMSIZ);
175  ifr2.ifr_addr.sa_family = AF_INET;
176 
177  // Set interface address
178 
179  inet_pton(AF_INET, tunnel_ip.c_str(), &addr->sin_addr);
180  if (ioctl(tunnel_sock, SIOCSIFADDR, &ifr2) < 0 )
181  {
182  perror("Error setting tunnel address");
183  exit (-1);
184  }
185 
186  // Set interface netmask
187  inet_pton(AF_INET, (char *)"255.255.255.0", &addr->sin_addr);
188  if (ioctl(tunnel_sock, SIOCSIFNETMASK, &ifr2) < 0 )
189  {
190  perror("Error setting tunnel netmask");
191  exit (-1);
192  }
193 
194  if (ioctl(tunnel_sock, SIOCGIFFLAGS, &ifr2) < 0 )
195  {
196  perror("Error getting tunnel interface flags");
197  exit (-1);
198  }
199 
200  // Bring interface up
201  ifr2.ifr_flags |= (IFF_UP | IFF_RUNNING);
202  if (ioctl(tunnel_sock, SIOCSIFFLAGS, &ifr2) < 0 )
203  {
204  perror("Error setting tunnel interface flags");
205  exit (-1);
206  }
207 
208  // Set interface MTU
209  ifr2.ifr_mtu = KPC9612P_MTU;
210  if (ioctl(tunnel_sock, SIOCSIFMTU, &ifr2) < 0 )
211  {
212  perror("Error setting tunnel interface MTU");
213  exit (-1);
214  }
215 
216 
217  close(tunnel_sock);
218  }
219 #endif
220 
221  // Start tunnel threads
222  thread tun_read_thread(tun_read_loop);
223  thread tun_write_thread(tun_write_loop);
224 
225  double nmjd = currentmjd(0.);
226  int32_t sleept;
227 
228  // Start performing the body of the agent
229  while(agent->running())
230  {
231  // Set beginning of next cycle;
232  nmjd += agent->cinfo->agent[0].aprd/86400.;
233 
234  sleept = (int32_t)((nmjd - currentmjd(0.))*86400000000.);
235  if (sleept < 0)
236  {
237  sleept = 0;
238  }
239  COSMOS_USLEEP(sleept);
240  }
241  tun_read_thread.join();
242  port_read_thread.join();
243  tcv_read_thread.join();
244  tun_write_thread.join();
245  tcv_write_thread.join();
246  exit (0);
247 }
char txr_devname[20]
Definition: agent_kpc9612p.cpp:69
kpc9612p_handle rxr_handle
Definition: agent_kpc9612p.cpp:72
int32_t kpc9612p_connect(char *dev, kpc9612p_handle *handle, uint8_t flag)
Definition: kpc9612p_lib.cpp:78
int tun_fd
Definition: agent_kpc9612p.cpp:65
int32_t request_transmit_port(string &req, string &response, Agent *)
Definition: agent_kpc9612p.cpp:465
void tun_read_loop()
Definition: agent_kpc9612p.cpp:250
int i
Definition: rw_test.cpp:37
int iretn
Definition: rw_test.cpp:37
void tcv_write_loop()
Definition: agent_kpc9612p.cpp:416
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
#define MAXBUFFERSIZE
Definition: agent_kpc9612p.cpp:77
bool open_tunnel
Definition: agent_kpc9612p.cpp:66
static char buffer[255]
Definition: propagator_simple.cpp:60
void port_read_loop()
Definition: agent_kpc9612p.cpp:323
int32_t add_request(string token, external_request_function function, string synopsis="", string description="")
Add internal request to Agent request list with description and synopsis.
Definition: agentclass.cpp:312
void tcv_read_loop()
Definition: agent_kpc9612p.cpp:358
Definition: agentclass.h:139
Agent * agent
Definition: agent_kpc9612p.cpp:49
cssl_t * serial
Definition: kpc9612p_lib.h:90
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
void tun_write_loop()
Definition: agent_kpc9612p.cpp:277
vector< agentstruc > agent
Single entry vector for agent information.
Definition: jsondef.h:4247
char rxr_devname[20]
Definition: agent_kpc9612p.cpp:68
cosmosstruc * cinfo
Definition: agentclass.h:346
#define KPC9612P_MTU
Definition: kpc9612p_lib.h:49
#define AGENT_ERROR_JSON_CREATE
Definition: cosmos-errno.h:105
kpc9612p_handle txr_handle
Definition: agent_kpc9612p.cpp:73

Variable Documentation

char agentname[COSMOS_MAX_NAME+1] = "tunnel"
char node[50] = ""
int waitsec = 5
Agent* agent
std::queue<std::vector<uint8_t> > tun_fifo
std::queue<std::vector<uint8_t> > tcv_fifo
std::mutex tcv_fifo_lock
std::condition_variable tcv_fifo_check
std::mutex tun_fifo_lock
std::condition_variable tun_fifo_check
int tun_fd
bool open_tunnel = true
char rxr_devname[20] =""
char txr_devname[20] =""
kpc9612p_handle rxr_handle
kpc9612p_handle txr_handle
socket_channel transmit_socket