COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_file2.cpp File Reference
#include "support/configCosmos.h"
#include "agent/agentclass.h"
#include "support/jsonlib.h"
#include "support/transferlib.h"
#include "support/sliplib.h"
#include "support/jsonobject.h"
#include <algorithm>
#include <cstring>
#include <time.h>
#include <iostream>
#include <fstream>
#include <string>
#include <sys/stat.h>
#include <sys/select.h>
Include dependency graph for agent_file2.cpp:

Classes

struct  channelstruc
 
struct  transmit_queue_entry
 
struct  tx_entry
 
struct  tx_queue
 

Macros

#define PROGRESS_QUEUE_SIZE   256
 
#define PACKET_SIZE_LO   (512-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
 
#define THROUGHPUT_LO   1300
 
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define THROUGHPUT_HI   150000
 

Functions

void send_loop ()
 
void recv_loop ()
 
void transmit_loop ()
 
int32_t request_debug (string &request, string &response, Agent *agent)
 
int32_t request_get_channels (string &request, string &response, Agent *agent)
 
int32_t request_set_throughput (string &request, string &response, Agent *agent)
 
int32_t request_remove_file (string &request, string &response, Agent *agent)
 
int32_t request_ls (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming_json (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing_json (string &request, string &response, Agent *agent)
 
int32_t request_set_logstride (string &request, string &response, Agent *agent)
 
int32_t request_get_logstride (string &request, string &response, Agent *agent)
 
int32_t outgoing_tx_add (tx_progress &tx_out)
 
int32_t outgoing_tx_add (std::string node_name, std::string agent_name, std::string file_name)
 
int32_t outgoing_tx_del (int32_t node, uint16_t tx_id=256)
 
int32_t outgoing_tx_purge (int32_t node, uint16_t tx_id=256)
 
int32_t outgoing_tx_recount (int32_t node)
 
int32_t incoming_tx_add (tx_progress &tx_in)
 
int32_t incoming_tx_add (std::string node_name, PACKET_TX_ID_TYPE tx_id)
 
int32_t incoming_tx_update (packet_struct_metashort meta)
 
int32_t incoming_tx_del (int32_t node, uint16_t tx_id=256)
 
int32_t incoming_tx_purge (int32_t node, uint16_t tx_id=256)
 
int32_t incoming_tx_recount (int32_t node)
 
vector< file_progressfind_chunks_missing (tx_progress &tx)
 
PACKET_FILE_SIZE_TYPE merge_chunks_overlap (tx_progress &tx)
 
double queuesendto (PACKET_NODE_ID_TYPE node_id, std::string type, vector< PACKET_BYTE > packet)
 
int32_t mysendto (std::string type, channelstruc &channel, vector< PACKET_BYTE > &buf)
 
int32_t myrecvfrom (std::string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
 
void debug_packet (vector< PACKET_BYTE > buf, std::string type)
 
int32_t write_meta (tx_progress &tx, double interval=5.)
 
int32_t read_meta (tx_progress &tx)
 
bool tx_progress_compare_by_size (const tx_progress &a, const tx_progress &b)
 
bool filestruc_compare_by_size (const filestruc &a, const filestruc &b)
 
PACKET_TX_ID_TYPE check_tx_id (tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
 
int32_t check_node_id_2 (std::string node_name)
 
int32_t check_node_id_2 (PACKET_NODE_ID_TYPE node_id)
 
int32_t check_channel (PACKET_NODE_ID_TYPE node_id)
 
int32_t check_remote_node_id (PACKET_NODE_ID_TYPE node_id)
 
int32_t set_remote_node_id (PACKET_NODE_ID_TYPE node_id, std::string node_name)
 
PACKET_TX_ID_TYPE choose_incoming_tx_id (int32_t node)
 
int32_t next_incoming_tx (PACKET_NODE_ID_TYPE node)
 
std::string json_list_incoming ()
 
std::string json_list_outgoing ()
 
std::string json_list_queue ()
 
void write_queue_log (double logdate)
 
int main (int argc, char *argv[])
 
double queuesendto (PACKET_NODE_ID_TYPE node_id, string type, vector< PACKET_BYTE > packet)
 
bool lower_chunk (file_progress i, file_progress j)
 

Variables

static bool debug_flag = true
 
static beatstruc cbeat
 
static Agentagent
 
static uint16_t use_channel = 0
 
static vector< channelstruccomm_channel
 
static std::queue< transmit_queue_entrytransmit_queue
 
static std::condition_variable transmit_queue_check
 
static std::mutex incoming_tx_lock
 
static std::mutex outgoing_tx_lock
 
static std::mutex debug_fd_lock
 
static double last_data_receive_time = 0.
 
static double next_reqmeta_time = 0.
 
static uint32_t packet_in_count = 0
 
static uint32_t packet_out_count
 
static uint32_t crc_error_count = 0
 
static uint32_t timeout_error_count = 0
 
static uint32_t type_error_count = 0
 
static uint32_t send_error_count = 0
 
static uint32_t recv_error_count = 0
 
static vector< tx_queuetxq
 
static std::string log_directory = "incoming"
 
double logstride_sec = 10.
 

Macro Definition Documentation

#define PROGRESS_QUEUE_SIZE   256
#define PACKET_SIZE_LO   (512-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
#define THROUGHPUT_LO   1300
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define THROUGHPUT_HI   150000

Function Documentation

void send_loop ( )
1128 {
1129  vector<PACKET_BYTE> packet;
1130  uint32_t sleep_time = 1;
1131  double send_time = 0.;
1132  static double next_send_time = 0.;
1133  double current_time;
1134 
1135  current_time = currentmjd();
1136  uint8_t previous_state = 0;
1137 
1138  while (agent->running())
1139  {
1140  if (agent->running() == (uint16_t)Agent::State::IDLE)
1141  {
1142  COSMOS_SLEEP(1);
1143  continue;
1144  }
1145 
1146  // If we did nothing last loop, wait at least 100 msec
1147  if (next_send_time < 1.16e-6)
1148  {
1149  // 100 msec in MJD
1150  next_send_time = 1.16e-6; // .1 second
1151  }
1152 
1153  // Time it should be after we wait
1154  double next_time = current_time + next_send_time;
1155  // Time it actually is now
1156  current_time = currentmjd();
1157  // Sleep if the difference is greater than zero
1158  if (next_time > current_time)
1159  {
1160  sleep_time = 86400. * (next_time - current_time);
1161  COSMOS_SLEEP(sleep_time);
1162  }
1163 
1164  // Bring us up to the present
1165  current_time = next_time;
1166 // next_send_time = .1 / 86400.;
1167 
1168  for (int32_t node=0; node < static_cast<int32_t>(txq.size()); ++node)
1169  {
1170  // See if we have an active channel serving this Node
1171  int32_t channel = check_channel(node);
1172  if (channel < 0)
1173  {
1174  continue;
1175  }
1176 
1177  if (debug_flag && txq[static_cast <size_t>(node)].outgoing.state != previous_state)
1178  {
1179  previous_state = txq[static_cast <size_t>(node)].outgoing.state;
1180  debug_fd_lock.lock();
1181  fprintf(agent->get_debug_fd(), "%16.10f Send: Node %s State: %d\n", currentmjd(), txq[static_cast <size_t>(node)].node_name.c_str(), txq[static_cast <size_t>(node)].outgoing.state);
1182  fflush(agent->get_debug_fd());
1183  debug_fd_lock.unlock();
1184  }
1185  // Decide what to do next based on our current state
1186  outgoing_tx_lock.lock();
1187  switch (txq[static_cast <size_t>(node)].outgoing.state)
1188  {
1189  case PACKET_QUEUE:
1190  // If we are in Queue state, then the only thing we want to do is send a Queue packet, if enough time has passed
1191  if (currentmjd() > txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_QUEUE - 8])
1192  {
1193  vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1194  PACKET_TX_ID_TYPE iq = 0;
1195  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
1196  {
1197  if (txq[static_cast <size_t>(node)].outgoing.progress[i].tx_id != 0)
1198  {
1199  tqueue[iq++] = txq[static_cast <size_t>(node)].outgoing.progress[i].tx_id;
1200  }
1201  if (iq == TRANSFER_QUEUE_LIMIT)
1202  {
1203  break;
1204  }
1205  }
1206  make_queue_packet(packet, node, txq[static_cast <size_t>(node)].node_name, tqueue);
1207  queuesendto(node, "tx", packet);
1208  txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_QUEUE - 8] = currentmjd() + 10. / 86400.;
1209  }
1210  break;
1211  case PACKET_METADATA:
1212  if (currentmjd() > txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_METADATA - 8])
1213  {
1214  int32_t remote_node = check_remote_node_id(node);
1215  if (remote_node >= 0)
1216  {
1217  for (uint16_t i=0; i<txq[static_cast <size_t>(node)].outgoing.meta_id.size(); ++i)
1218  {
1219  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].outgoing, txq[static_cast <size_t>(node)].outgoing.meta_id[i]);
1220  if (tx_id > 0)
1221  {
1222  tx_progress tx = txq[static_cast <size_t>(node)].outgoing.progress[tx_id];
1223  vector<PACKET_BYTE> packet;
1224  make_metadata_packet(packet, remote_node, tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.agent_name.c_str());
1225  send_time = queuesendto(node, "tx", packet);
1226  if (send_time >= 0.)
1227  {
1228  next_send_time += send_time;
1229  }
1230  else
1231  {
1232  next_send_time = 10. / 86400.;
1233  }
1234  txq[static_cast <size_t>(node)].outgoing.state = PACKET_DATA;
1235  }
1236  }
1237  }
1238  txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_METADATA - 8] = currentmjd() + 10. / 86400.;
1239  }
1240  break;
1241  case PACKET_DATA:
1242  if (currentmjd() > txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_DATA - 8])
1243  {
1244  // See if we have an active transfer
1245  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].outgoing, txq[static_cast <size_t>(node)].outgoing.id);
1246  if (tx_id > 0 && txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size())
1247  {
1248  // Attempt to open the outgoing progress file
1249  if (txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp == nullptr)
1250  {
1251  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp = fopen(txq[static_cast <size_t>(node)].outgoing.progress[tx_id].filepath.c_str(), "r");
1252  }
1253 
1254  // If we're good, continue with the process
1255  if(txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp != nullptr)
1256  {
1257  file_progress tp;
1258  tp = txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[0];
1259 
1260  PACKET_FILE_SIZE_TYPE byte_count = (tp.chunk_end - tp.chunk_start) + 1;
1261  if (byte_count > comm_channel[use_channel].packet_size)
1262  {
1263  byte_count = comm_channel[use_channel].packet_size;
1264  }
1265 
1266  tp.chunk_end = tp.chunk_start + byte_count - 1;
1267 
1268  // Read the packet and send it
1269  int32_t nbytes;
1270  PACKET_BYTE* chunk = new PACKET_BYTE[byte_count]();
1271  if (!(nbytes = fseek(txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp, tp.chunk_start, SEEK_SET)))
1272  {
1273  nbytes = fread(chunk, 1, byte_count, txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp);
1274  }
1275  if (nbytes == byte_count)
1276  {
1277  // See if we know what the remote node_id is for this
1278  int32_t remote_node = check_remote_node_id(node);
1279  if (remote_node >= 0)
1280  {
1281  make_data_packet(packet, remote_node, txq[static_cast <size_t>(node)].outgoing.progress[tx_id].tx_id, byte_count, tp.chunk_start, chunk);
1282 
1283  send_time = queuesendto(node, "tx", packet);
1284  if (send_time >= 0.)
1285  {
1286  if (send_time > next_send_time)
1287  {
1288  next_send_time = send_time;
1289  }
1290  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[0].chunk_start = tp.chunk_end + 1;
1291  }
1292  else
1293  {
1294  next_send_time = 10. / 86400.;
1295  }
1296  }
1297  }
1298  else
1299  {
1300  // Some problem with this transmission, ask other end to dequeue it
1301  // Remove transaction
1302  next_send_time = 0.;
1303  txq[static_cast <size_t>(node)].outgoing.state = PACKET_CANCEL;
1304  }
1305  delete[] chunk;
1306 
1307  if (txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[0].chunk_start > txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[0].chunk_end)
1308  {
1309  // All done with this file_info entry. Close file and remove entry.
1310  fclose(txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp);
1311  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp = nullptr;
1312  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.pop_front();
1313  }
1314 
1315  write_meta(txq[static_cast <size_t>(node)].outgoing.progress[tx_id]);
1316  }
1317  else
1318  {
1319  // Some problem with this transmission, ask other end to dequeue it
1320 
1321  next_send_time = 0.;
1322  txq[static_cast <size_t>(node)].outgoing.state = PACKET_CANCEL;
1323  }
1324  }
1325  else
1326  {
1327  if (currentmjd() > txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_QUEUE - 8])
1328  {
1329  txq[static_cast <size_t>(node)].outgoing.state = PACKET_QUEUE;
1330  }
1331  }
1332  txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_DATA - 8] = currentmjd() + next_send_time;
1333  }
1334  break;
1335  case PACKET_CANCEL:
1336  if (currentmjd() > txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_CANCEL - 8])
1337  {
1338  // See if we have an active transfer
1339  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].outgoing, txq[static_cast <size_t>(node)].outgoing.id);
1340  outgoing_tx_del(node, tx_id);
1341  int32_t remote_node = check_remote_node_id(node);
1342  if (remote_node >= 0)
1343  {
1344  // Send a CANCEL packet
1345  vector<PACKET_BYTE> packet;
1346  make_cancel_packet(packet, remote_node, tx_id);
1347  queuesendto(node, "tx", packet);
1348  }
1349 // txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_CANCEL - 8] = currentmjd() + 10. / 86400.;
1350  txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_CANCEL - 8] = currentmjd() + next_send_time;
1351  txq[static_cast <size_t>(node)].outgoing.state = PACKET_QUEUE;
1352  }
1353  break;
1354  }
1355 
1356  outgoing_tx_lock.unlock();
1357  }
1358  }
1359 }
int32_t check_channel(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file2.cpp:2728
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
Definition: eci2kep_test.cpp:33
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
void make_data_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, PACKET_TX_ID_TYPE tx_id, PACKET_CHUNK_SIZE_TYPE byte_count, PACKET_FILE_SIZE_TYPE chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:232
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
Definition: transferlib.h:338
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file2.cpp:1648
static Agent * agent
Definition: agent_file2.cpp:95
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
static uint16_t use_channel
Definition: agent_file2.cpp:96
void make_cancel_packet(vector< PACKET_BYTE > &packet, packet_struct_cancel cancel)
Definition: transferlib.cpp:82
double queuesendto(PACKET_NODE_ID_TYPE node_id, std::string type, vector< PACKET_BYTE > packet)
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
int32_t outgoing_tx_del(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2249
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file2.cpp:2692
#define SEEK_SET
Definition: zconf.h:475
string file_name
Definition: transferlib.h:351
uint8_t PACKET_BYTE
Definition: transferlib.h:140
void make_queue_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > queue)
Definition: transferlib.cpp:277
static string node
Definition: agent_monitor.cpp:126
static std::mutex outgoing_tx_lock
Definition: agent_file2.cpp:130
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
int32_t check_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file2.cpp:2740
static bool debug_flag
Definition: agent_file2.cpp:72
void recv_loop ( )
510 {
511  vector<PACKET_BYTE> recvbuf;
512  std::string partial_filepath;
513 
514  while (agent->running())
515  {
516  if (agent->running() == (uint16_t)Agent::State::IDLE)
517  {
518  COSMOS_SLEEP(1);
519  continue;
520  }
521  else
522  {
523  COSMOS_SLEEP(.001);
524  }
525 
526  int32_t nbytes = 0;
527  socket_channel rchannel;
528  if (( nbytes = myrecvfrom("rx", rchannel, recvbuf, PACKET_MAX_LENGTH)) > 0)
529  {
530  // Generate extra network info
531  inet_ntop(rchannel.caddr.sin_family, &rchannel.caddr.sin_addr, rchannel.address, sizeof(rchannel.address));
532 
533  // Check channels, update information if we are already handling it, otherwise add channel
534  string node_name;
535  int32_t node;
536  if ((recvbuf[0] & 0x0f) == PACKET_QUEUE)
537  {
538  node_name = reinterpret_cast<char *>(&recvbuf[PACKET_QUEUE_OFFSET_NODE_NAME]);
539  node = check_node_id_2(node_name);
540  }
541  else
542  {
544  node_name = txq[static_cast <size_t>(node)].node_name;
545  }
546 // if (node < 0)
547 // {
548 // if (debug_flag)
549 // {
550 // debug_fd_lock.lock();
551 // fprintf(agent->get_debug_fd(), "%16.10f Network: Unknown: %d %s %s %u\n", currentmjd(), node, "unknown", rchannel.address, ntohs(rchannel.caddr.sin_port));
552 // fflush(agent->get_debug_fd());
553 // debug_fd_lock.unlock();
554 // }
555 // }
556 
557  bool found = false;
558  for (uint16_t i=0; i<comm_channel.size(); ++i)
559  {
560  // Are we handling this Node?
561  if (comm_channel[i].node == node_name)
562  {
563  comm_channel[i].nmjd = currentmjd();
564  comm_channel[i].chansock = rchannel;
565 // inet_ntop(comm_channel[i].chansock.caddr.sin_family, &comm_channel[i].chansock.caddr.sin_addr, comm_channel[i].chansock.address, sizeof(comm_channel[i].chansock.address));
566  comm_channel[i].chanip = comm_channel[i].chansock.address;
567  use_channel = i;
568  if (debug_flag)
569  {
570  debug_fd_lock.lock();
571  fprintf(agent->get_debug_fd(), "%16.10f Network: Old: %u %s %s %u\n", currentmjd(), i, comm_channel[i].node.c_str(), comm_channel[i].chanip.c_str(), ntohs(comm_channel[i].chansock.caddr.sin_port));
572  fflush(agent->get_debug_fd());
573  debug_fd_lock.unlock();
574  }
575  found = true;
576  break;
577  }
578  }
579 
580  if (!found)
581  {
582  channelstruc tchannel;
583  tchannel.node = node_name;
584  tchannel.nmjd = currentmjd(0.);
585  tchannel.chansock = rchannel;
586  inet_ntop(tchannel.chansock.caddr.sin_family, &tchannel.chansock.caddr.sin_addr, tchannel.chansock.address, sizeof(tchannel.chansock.address));
587  tchannel.chanip = tchannel.chansock.address;
588  use_channel = static_cast <uint16_t>(comm_channel.size());
589  comm_channel.push_back(tchannel);
590  if (debug_flag)
591  {
592  debug_fd_lock.lock();
593  fprintf(agent->get_debug_fd(), "%16.10f Network: New: %u %s %s %u\n", currentmjd(), use_channel, tchannel.node.c_str(), tchannel.chanip.c_str(), ntohs(tchannel.chansock.caddr.sin_port));
594  fflush(agent->get_debug_fd());
595  debug_fd_lock.unlock();
596  }
597  }
599 
600  // Respond appropriately according to type of packet
601  switch (recvbuf[0] & 0x0f)
602  {
603  case PACKET_METADATA:
604  {
606 
607  extract_metadata(recvbuf, meta);
608  int32_t node = check_node_id_2(meta.node_id);
609  if (node >= 0)
610  {
611  comm_channel[use_channel].node = txq[static_cast <size_t>(node)].node_name;
612  }
613 
614  incoming_tx_lock.lock();
615 
616  incoming_tx_update(meta);
617 
618  incoming_tx_lock.unlock();
619 
620  break;
621  }
622  case PACKET_DATA:
623  {
624  packet_struct_data data;
625 
626  extract_data(recvbuf, data.node_id, data.tx_id, data.byte_count, data.chunk_start, data.chunk);
627 
629 
630  // create transaction entry if new, and then add data
631 
632  incoming_tx_lock.lock();
633 
634  int32_t node = check_node_id_2(data.node_id);
635 
636  if (node >= 0)
637  {
638  comm_channel[use_channel].node = txq[static_cast <size_t>(node)].node_name;
639  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].incoming, data.tx_id);
640 
641  // Update corresponding incoming queue entry if it exists
642  if (tx_id > 0)
643  {
644  // tx_id now points to the valid entry to which we should add the data
645  file_progress tp;
646  tp.chunk_start = data.chunk_start;
647  tp.chunk_end = data.chunk_start + data.byte_count - 1;
648 
649  packet_struct_data odata;
650  odata = data;
651 
652  uint32_t check=0;
653  bool duplicate = false;
654  bool updated = false;
655 
656  // Do we have any data yet?
657  if (!txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.size())
658  {
659  // Add first entry, then write data
660  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.push_back(tp);
661  txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes += data.byte_count;
662  updated = true;
663  }
664  else
665  {
666  // Check against existing data
667  for (uint32_t j=0; j<txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.size(); ++j)
668  {
669  // Check for duplicate
670  if (tp.chunk_start >= txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start && tp.chunk_end <= txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end)
671  {
672  duplicate = true;
673  break;
674  }
675  // If we start before this entry
676  if (tp.chunk_start < txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start)
677  {
678  // If we end before this entry (at least one byte between), insert
679  if (tp.chunk_end + 1 < txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start)
680  {
681  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.insert(txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.begin()+j, tp);
682  txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes += data.byte_count;
683  updated = true;
684  break;
685  }
686  // Otherwise, extend the near end
687  else
688  {
689  tp.chunk_end = txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start - 1;
690  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
691  data.byte_count = (tp.chunk_end - tp.chunk_start) + 1;
692  txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes += data.byte_count;
693  updated = true;
694  break;
695  }
696  }
697  else
698  {
699  // If we overlap on the end, extend the far end
700  if (tp.chunk_start <= txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end + 1)
701  {
702  if (tp.chunk_end > txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end)
703  {
704  data.byte_count = tp.chunk_end - txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end;
705  tp.chunk_start = txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end + 1;
706  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
707  txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes += data.byte_count;
708  updated = true;
709  break;
710  }
711  }
712  }
713  check = j + 1;
714  }
715 
716 
717  // If we are higher than everything currently in the list, then append
718  if (!duplicate && check == txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.size())
719  {
720  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.push_back(tp);
721  txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes += data.byte_count;
722  updated = true;
723  }
724 
725  }
726 
727  // Write to disk if this is new data
728  if (updated)
729  {
730  // Write incoming data to disk
731  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp == nullptr)
732  {
733  partial_filepath = txq[static_cast <size_t>(node)].incoming.progress[tx_id].temppath + ".file";
734  if (data_exists(partial_filepath))
735  {
736  txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), "r+");
737  }
738  else
739  {
740  txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), "w");
741  }
742  }
743 
744  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp == nullptr)
745  {
746  if (debug_flag)
747  {
748  debug_fd_lock.lock();
749  fprintf(agent->get_debug_fd(), "%16.10f Recv: File Error: %s %s on ID: %u Chunk: %u\n", currentmjd(), partial_filepath.c_str(), cosmos_error_string(-errno).c_str(), tx_id, tp.chunk_start);
750  fflush(agent->get_debug_fd());
751  debug_fd_lock.unlock();
752  }
753  }
754  else
755  {
756  fseek(txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp, tp.chunk_start, SEEK_SET);
757  fwrite(data.chunk, data.byte_count, 1, txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
758  fflush(txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
759  // Write latest meta data to disk
760  write_meta(txq[static_cast <size_t>(node)].incoming.progress[tx_id]);
761  if (debug_flag)
762  {
763  uint32_t total = 0;
764  for (uint16_t i=0; i<data.byte_count; ++i)
765  {
766  total += data.chunk[i];
767  }
768 // debug_fd_lock.lock();
769 // fprintf(agent->get_debug_fd(), "%16.10f Recv: Original: %u %u Final: %u %u Chunk: %u %u Total: %u\n", currentmjd(), odata.chunk_start, odata.byte_count, data.chunk_start, data.byte_count, tp.chunk_start, tp.chunk_end, total);
770 // fflush(agent->get_debug_fd());
771 // debug_fd_lock.unlock();
772  }
773  }
774 
775  }
776 
777  // Check if file has been completely received
778  if(txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_size == txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes && txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
779  {
780  // See if we know what the remote node_id is for this
781  int32_t remote_node = check_remote_node_id(static_cast <PACKET_NODE_ID_TYPE>(node));
782  if (remote_node >= 0)
783  {
784  tx_progress tx_in = txq[static_cast <size_t>(node)].incoming.progress[tx_id];
785  debug_fd_lock.lock();
786  fprintf(agent->get_debug_fd(), "%16.10f Recv: Complete: %u %s %u %u\n", currentmjd(), tx_in.tx_id, tx_in.node_name.c_str(), tx_in.file_size, tx_in.total_bytes);
787  fflush(agent->get_debug_fd());
788  debug_fd_lock.unlock();
789 
790  // inform other end that file has been received
791  vector<PACKET_BYTE> packet;
792  make_complete_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(remote_node), tx_in.tx_id);
793  queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node), "rx", packet);
794 
795  // Move file to its final location
796  if (!txq[static_cast <size_t>(node)].incoming.progress[tx_id].complete)
797  {
798  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp != nullptr)
799  {
800  fclose(txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
801  txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp = nullptr;
802  }
803  std::string final_filepath = tx_in.temppath + ".file";
804  int iret = rename(final_filepath.c_str(), tx_in.filepath.c_str());
805  // Make sure metadata is recorded
806  write_meta(txq[static_cast <size_t>(node)].incoming.progress[tx_id], 0.);
807  if (debug_flag)
808  {
809  debug_fd_lock.lock();
810  fprintf(agent->get_debug_fd(), "%16.10f Recv: Renamed: %d %s\n", currentmjd(), iret, tx_in.filepath.c_str());
811  fflush(agent->get_debug_fd());
812  debug_fd_lock.unlock();
813  }
814  // Mark complete
815  txq[static_cast <size_t>(node)].incoming.progress[tx_id].complete = true;
816  }
817  }
818  }
819  }
820  }
821 
822  incoming_tx_lock.unlock();
823 
824  break;
825  }
826  case PACKET_REQDATA:
827  {
828  packet_struct_reqdata reqdata;
829 
830  extract_reqdata(recvbuf, reqdata);
831 
832  // Simple validity check
833  int32_t node = check_node_id_2(reqdata.node_id);
834  if (node >= 0)
835  {
836  comm_channel[use_channel].node = txq[static_cast <size_t>(node)].node_name;
837  }
838 
839  if (node < 0 || reqdata.hole_end < reqdata.hole_start)
840  {
841  break;
842  }
843 
844  outgoing_tx_lock.lock();
845 
846  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].outgoing, reqdata.tx_id);
847  // tx_id now points to the valid entry to which we should add the data
848 
849  if (tx_id > 0)
850  {
851  // Add this chunk to the queue
852  file_progress tp;
853  tp.chunk_start = reqdata.hole_start;
854  tp.chunk_end = reqdata.hole_end;
855  PACKET_FILE_SIZE_TYPE byte_count = (reqdata.hole_end - reqdata.hole_start) + 1;
856 
857  uint32_t check=0;
858  // Anything in the queue yet?
859  if (!txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size())
860  {
861  // Add first entry
862  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.push_back(tp);
863  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes += byte_count;
864  }
865  else
866  {
867  // Check against existing data
868  for (uint32_t j=0; j<txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size(); ++j)
869  {
870  // If we match this entry
871  if (tp.chunk_start == txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start && tp.chunk_end == txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end)
872  {
873  break;
874  }
875  // If we start before this entry
876  if (tp.chunk_start < txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start)
877  {
878  // If we end before this entry (at least one byte between), insert
879  if (tp.chunk_end + 1 < txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start)
880  {
881  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.insert(txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.begin()+j, tp);
882  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes += byte_count;
883  break;
884  }
885  // Otherwise, extend the near end
886  else
887  {
888  tp.chunk_end = txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
889  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
890  byte_count = (tp.chunk_end - tp.chunk_start) + 1;
891  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes += byte_count;
892  break;
893  }
894  }
895  else
896  {
897  // If we overlap on the end, extend the far end
898  if (tp.chunk_start <= txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
899  {
900  if (tp.chunk_end > txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end)
901  {
902  byte_count = tp.chunk_end - txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end;
903  tp.chunk_start = txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
904  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
905  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes += byte_count;
906  break;
907  }
908  }
909  }
910  check = j + 1;
911  }
912 
913 
914  // If we are higher than everything currently in the list, then append
915  if (check == txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size())
916  {
917  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.push_back(tp);
918  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes += byte_count;
919  }
920 
921  }
922 
923  // Save meta to disk
924  write_meta(txq[static_cast <size_t>(node)].outgoing.progress[tx_id]);
925  txq[static_cast <size_t>(node)].outgoing.id = reqdata.tx_id;
926  txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_DATA-8] = currentmjd();
927  txq[static_cast <size_t>(node)].outgoing.state = PACKET_DATA;
928  }
929 
930  outgoing_tx_lock.unlock();
931  break;
932  }
933  //Request missing metadata
934  case PACKET_REQMETA:
935  {
936  packet_struct_reqmeta reqmeta;
937 
938  extract_reqmeta(recvbuf, reqmeta);
939 
940  outgoing_tx_lock.lock();
941 
942 
943  // Send requested META packets
944  int32_t node = set_remote_node_id(reqmeta.node_id, reqmeta.node_name);
945  if (node >= 0)
946  {
947  comm_channel[use_channel].node = txq[static_cast <size_t>(node)].node_name;
948  // See if we know what the remote node_id is for this
949  int32_t remote_node = check_remote_node_id(node);
950  if (remote_node >= 0)
951  {
952  txq[static_cast <size_t>(node)].outgoing.meta_id.clear();
953  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
954  {
955  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].outgoing, reqmeta.tx_id[i]);
956  if (tx_id > 0)
957  {
958  txq[static_cast <size_t>(node)].outgoing.meta_id.push_back(tx_id);
959  }
960  }
961  txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_METADATA-8] = currentmjd();
962  txq[static_cast <size_t>(node)].outgoing.state = PACKET_METADATA;
963  }
964  }
965 
966  outgoing_tx_lock.unlock();
967  break;
968  }
969  case PACKET_CANCEL:
970  {
971  packet_struct_complete cancel;
972 
973  extract_complete(recvbuf, cancel);
974 
975  int32_t node = check_node_id_2(cancel.node_id);
976  if (node >= 0)
977  {
978  comm_channel[use_channel].node = txq[static_cast <size_t>(node)].node_name;
979  incoming_tx_lock.lock();
980 
981  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].incoming, cancel.tx_id);
982 
983  if (tx_id > 0)
984  {
985  // Remove the transaction
986  incoming_tx_del(node, tx_id);
987  }
988 
989  next_incoming_tx(node);
990  incoming_tx_lock.unlock();
991  }
992  break;
993  }
994  case PACKET_COMPLETE:
995  {
996  packet_struct_complete complete;
997 
998  extract_complete(recvbuf, complete);
999 
1000  int32_t node = check_node_id_2(complete.node_id);
1001  if (node >= 0)
1002  {
1003  outgoing_tx_lock.lock();
1004  comm_channel[use_channel].node = txq[static_cast <size_t>(node)].node_name;
1005 
1006  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].outgoing, complete.tx_id);
1007 
1008  if (tx_id > 0)
1009  {
1010  // See if we know what the remote node_id is for this
1011  int32_t remote_node = check_remote_node_id(node);
1012  if (remote_node >= 0)
1013  {
1014  txq[static_cast <size_t>(node)].outgoing.nmjd[PACKET_CANCEL-8] = currentmjd();
1015  txq[static_cast <size_t>(node)].outgoing.state = PACKET_CANCEL;
1016  txq[static_cast <size_t>(node)].outgoing.id = tx_id;
1017  }
1018  }
1019 
1020  outgoing_tx_lock.unlock();
1021  }
1022  break;
1023  }
1024  case PACKET_QUEUE:
1025  {
1026  packet_struct_queue queue;
1027 
1028  extract_queue(recvbuf, queue);
1029 
1030  incoming_tx_lock.lock();
1031 
1032  // Is this a node we are handling?
1033  int32_t node = check_node_id_2(queue.node_name);
1034  if (node >= 0)
1035  {
1036  comm_channel[use_channel].node = txq[static_cast <size_t>(node)].node_name;
1037 
1038  // Set remote node_id
1039  txq[static_cast <size_t>(node)].node_id = queue.node_id + 1;
1040  // Sort through incoming queue and remove anything not in sent queue
1041  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1042  {
1043  bool valid = false;
1044  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1045  {
1046  if (queue.tx_id[i] && txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id == queue.tx_id[i])
1047  {
1048  // This is an incoming file we should handle
1049  valid = true;
1050  break;
1051  }
1052  }
1053 
1054  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && !valid)
1055  {
1056  // This is not an incoming file we should handle
1057  incoming_tx_del(node, tx_id);
1058  }
1059  }
1060 
1061  // Sort through remotely sent queue and add anything not in our local incoming queue
1062  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1063  {
1064  if (queue.tx_id[i])
1065  {
1066  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].incoming, queue.tx_id[i]);
1067 
1068  if (tx_id == 0)
1069  {
1070  incoming_tx_add(queue.node_name, queue.tx_id[i]);
1071  }
1072  }
1073  }
1074 
1075  // Go through final incoming queue and request any missing meta data
1076  if (currentmjd() > next_reqmeta_time)
1077  {
1079  vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1080  PACKET_TX_ID_TYPE iq = 0;
1081  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1082  {
1083  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && !txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
1084  {
1085  next_reqmeta_time += sizeof(packet_struct_metashort) / (86400. * comm_channel[use_channel].throughput);
1086  tqueue[iq++] = tx_id;
1087  }
1088  if (iq == TRANSFER_QUEUE_LIMIT)
1089  {
1090  break;
1091  }
1092  }
1093  if (iq)
1094  {
1095  vector<PACKET_BYTE> packet;
1096  make_reqmeta_packet(packet, node, txq[static_cast <size_t>(node)].node_name, tqueue);
1097  queuesendto(node, "rx", packet);
1098  }
1099  }
1100 
1101  next_incoming_tx(node);
1102  }
1103  incoming_tx_lock.unlock();
1104  }
1105  break;
1106  default:
1107  ++type_error_count;
1108  break;
1109  }
1110  }
1111  }
1112 
1113  // Flush any active metadata
1114  for (uint16_t node=0; node<txq.size(); ++node)
1115  {
1116  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1117  {
1118  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
1119  {
1120  write_meta(txq[static_cast <size_t>(node)].incoming.progress[tx_id], 0.);
1121  }
1122  }
1123  }
1124 
1125 }
string node
Definition: agent_file2.cpp:100
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
Definition: transferlib.h:205
static std::mutex incoming_tx_lock
Definition: agent_file2.cpp:129
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
Definition: transferlib.h:291
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
Definition: transferlib.h:332
Definition: transferlib.h:338
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:263
string chanip
Definition: agent_file2.cpp:102
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
Definition: transferlib.h:245
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:219
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:247
char address[17]
Definition: socketlib.h:134
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file2.cpp:1648
static Agent * agent
Definition: agent_file2.cpp:95
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file2.cpp:2479
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
Definition: agent_file2.cpp:98
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
Definition: transferlib.h:217
string node_name
Definition: agent_001.cpp:46
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
static uint16_t use_channel
Definition: agent_file2.cpp:96
int32_t myrecvfrom(std::string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
Definition: agent_file2.cpp:1456
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
double queuesendto(PACKET_NODE_ID_TYPE node_id, std::string type, vector< PACKET_BYTE > packet)
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:207
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
Definition: socketlib.h:115
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
struct sockaddr_in caddr
Definition: socketlib.h:122
Definition: transferlib.h:261
#define PACKET_HEADER_OFFSET_TOTAL
Definition: transferlib.h:151
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
#define PACKET_QUEUE_OFFSET_NODE_NAME
Definition: transferlib.h:213
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file2.cpp:2692
#define SEEK_SET
Definition: zconf.h:475
int32_t incoming_tx_del(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2525
Definition: transferlib.h:275
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:221
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
socket_channel chansock
Definition: agent_file2.cpp:101
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:220
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
static double next_reqmeta_time
Definition: agent_file2.cpp:134
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
static uint32_t type_error_count
Definition: agent_file2.cpp:141
int32_t next_incoming_tx(PACKET_NODE_ID_TYPE node)
Definition: agent_file2.cpp:2767
double nmjd
Definition: agent_file2.cpp:105
static string node
Definition: agent_monitor.cpp:126
static std::mutex outgoing_tx_lock
Definition: agent_file2.cpp:130
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
void make_reqmeta_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > reqmeta)
Definition: transferlib.cpp:110
int32_t check_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file2.cpp:2740
int32_t set_remote_node_id(PACKET_NODE_ID_TYPE node_id, std::string node_name)
Definition: agent_file2.cpp:2753
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:293
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file2.cpp:2377
bool data_exists(string &path)
Check existence of path.
Definition: datalib.cpp:1003
static double last_data_receive_time
Definition: agent_file2.cpp:133
static bool debug_flag
Definition: agent_file2.cpp:72
void transmit_loop ( )
1362 {
1363  std::mutex transmit_queue_lock;
1364  std::unique_lock<std::mutex> locker(transmit_queue_lock);
1365 
1366  while (agent->running())
1367  {
1368  if (agent->running() == (uint16_t)Agent::State::IDLE)
1369  {
1370  COSMOS_SLEEP(1);
1371  continue;
1372  }
1373 
1374 
1375  transmit_queue_check.wait(locker);
1376 
1377  while (!transmit_queue.empty())
1378  {
1379  // Get next packet from transceiver FIFO
1380  transmit_queue_entry entry = transmit_queue.front();
1381  transmit_queue.pop();
1382  mysendto(entry.type, comm_channel[entry.channel], entry.packet);
1383  }
1384  }
1385 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
static Agent * agent
Definition: agent_file2.cpp:95
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
int32_t mysendto(std::string type, channelstruc &channel, vector< PACKET_BYTE > &buf)
Definition: agent_file2.cpp:1422
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file2.cpp:119
static std::condition_variable transmit_queue_check
Definition: agent_file2.cpp:120
Definition: agent_file.cpp:110
int32_t request_debug ( string &  request,
string &  response,
Agent agent 
)
2835 {
2836 
2837  std::string requestString = std::string(request);
2838  StringParser sp(requestString, ' ');
2839 
2840  debug_flag = sp.getFieldNumberAsDouble(2); // should be getFieldNumberAsBoolean
2841 
2842  std::cout << "debug_flag: " << debug_flag << std::endl;
2843  return 0;
2844 }
Definition: stringlib.h:89
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t request_get_channels ( string &  request,
string &  response,
Agent agent 
)
1961 {
1962  for (uint16_t channel=0; channel<comm_channel.size(); ++channel)
1963  {
1964  response = ("{");
1965  response += to_json("node", comm_channel[channel].node);
1966  response += to_json("ip", comm_channel[channel].chanip);
1967  response += to_json("size", comm_channel[channel].packet_size);
1968  response += to_json("throughput", comm_channel[channel].throughput);
1969  response += to_json("nmjd", comm_channel[channel].nmjd);
1970  response += to_json("lmjd", comm_channel[channel].lmjd);
1971  response = ("},");
1972  }
1973  return 0;
1974 }
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
static string node
Definition: agent_monitor.cpp:126
string to_json(string key, string value)
Definition: stringlib.cpp:334
int32_t request_set_throughput ( string &  request,
string &  response,
Agent agent 
)
1977 {
1978  uint16_t channel=0;
1979  uint32_t throughput=0;
1980 
1981  sscanf(request.c_str(), "%*s %hu %u\n", &channel, &throughput);
1982  if (channel < comm_channel.size())
1983  {
1984 // use_channel = channel;
1985  if (throughput)
1986  {
1987  comm_channel[channel].throughput = throughput;
1988  }
1989  }
1990  else
1991  {
1992  response = "Channel " + to_unsigned(channel) + " too large";
1993  }
1994  return 0;
1995 
1996 }
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
int32_t request_remove_file ( string &  request,
string &  response,
Agent agent 
)
1999 {
2000  char type;
2001  uint32_t tx_id;
2002 
2003  sscanf(request.c_str(), "%*s %c %u\n", &type, &tx_id);
2004  switch (type)
2005  {
2006  case 'i':
2007  {
2008  break;
2009  }
2010  case 'o':
2011  {
2012  break;
2013  }
2014  }
2015 
2016  return 0;
2017 }
int32_t request_ls ( string &  request,
string &  response,
Agent agent 
)
1869 {
1870 
1871  //the request string == "ls directoryname"
1872  //get the directory name
1873 // char directoryname[COSMOS_MAX_NAME+1];
1874 // memmove(directoryname, request.substr(3), COSMOS_MAX_NAME);
1875  std::string directoryname = request.substr(3);
1876 
1877  DIR* dir;
1878  struct dirent* ent;
1879 
1880  std::string all_file_names;
1881 
1882  if((dir = opendir(directoryname.c_str())) != nullptr)
1883  {
1884  while (( ent = readdir(dir)) != nullptr)
1885  {
1886  all_file_names += ent->d_name;
1887  all_file_names += "\n";
1888  }
1889  closedir(dir);
1890 
1891  response = (all_file_names.c_str());
1892  }
1893  else
1894  {
1895  response = "unable to open directory " + directoryname;
1896  }
1897  return 0;
1898 }
int closedir(DIR *dir)
Definition: dirent.c:74
Definition: dirent.c:24
Definition: dirent.h:21
DIR * opendir(const char *name)
Definition: dirent.c:32
struct dirent * readdir(DIR *dir)
Definition: dirent.c:97
char * d_name
Definition: dirent.h:23
int32_t request_list_incoming ( string &  request,
string &  response,
Agent agent 
)
1901 {
1902  response.clear();
1903  for (uint16_t node = 0; node<txq.size(); ++node)
1904  {
1905  response += std::to_string(node) + ' ' + txq[(node)].node_name + ' ' + std::to_string(txq[(node)].incoming.size) + "\n";
1906  for(tx_progress tx : txq[(node)].incoming.progress)
1907  {
1908  if (tx.tx_id)
1909  {
1910  response += to_label("tx_id", tx.tx_id) + ' ';
1911  response += to_label("node", tx.node_name) + ' ';
1912  response += to_label("agent", tx.agent_name) + ' ';
1913  response += to_label("name", tx.file_name) + ' ';
1914  response += to_label("bytes", tx.total_bytes) + ' ';
1915  response += "/" + to_unsigned(tx.file_size) + ' ';
1916  response += to_label("havemeta", tx.havemeta) + ' ';
1917  response += to_label("sendmeta", tx.sendmeta) + ' ';
1918  response += to_label("sentmeta", tx.sentmeta) + ' ';
1919  response += to_label("senddata", tx.senddata) + ' ';
1920  response += to_label("sentdata", tx.sentdata) + ' ';
1921  response += to_label("complete", tx.complete);
1922  response += "\n";
1923  }
1924  }
1925  }
1926 
1927  return 0;
1928 }
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
static string node
Definition: agent_monitor.cpp:126
string to_label(string label, double value, uint16_t precision, bool mjd)
Definition: stringlib.cpp:376
int32_t request_list_outgoing ( string &  request,
string &  response,
Agent agent 
)
1931 {
1932  response.clear();
1933  for (uint16_t node=0; node<txq.size(); ++node)
1934  {
1935  response += std::to_string(node) + ' ' + txq[(node)].node_name + ' ' + std::to_string(txq[(node)].outgoing.size) + "\n";
1936  for(tx_progress tx : txq[(node)].outgoing.progress)
1937  {
1938  if (tx.tx_id)
1939  {
1940  response += to_label("tx_id", tx.tx_id) + ' ';
1941  response += to_label("node", tx.node_name) + ' ';
1942  response += to_label("agent", tx.agent_name) + ' ';
1943  response += to_label("name", tx.file_name) + ' ';
1944  response += to_label("bytes", tx.total_bytes) + ' ';
1945  response += "/" + to_unsigned(tx.file_size) + ' ';
1946  response += to_label("havemeta", tx.havemeta) + ' ';
1947  response += to_label("sendmeta", tx.sendmeta) + ' ';
1948  response += to_label("sentmeta", tx.sentmeta) + ' ';
1949  response += to_label("senddata", tx.senddata) + ' ';
1950  response += to_label("sentdata", tx.sentdata) + ' ';
1951  response += to_label("complete", tx.complete);
1952  response += "\n";
1953  }
1954  }
1955  }
1956 
1957  return 0;
1958 }
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
static string node
Definition: agent_monitor.cpp:126
string to_label(string label, double value, uint16_t precision, bool mjd)
Definition: stringlib.cpp:376
int32_t request_list_incoming_json ( string &  request,
string &  response,
Agent agent 
)
2873 {
2874  response = (json_list_incoming().c_str());
2875  return 0;
2876 }
std::string json_list_incoming()
Definition: agent_file2.cpp:2884
int32_t request_list_outgoing_json ( string &  request,
string &  response,
Agent agent 
)
2879 {
2880  response = (json_list_outgoing().c_str());
2881  return 0;
2882 }
std::string json_list_outgoing()
Definition: agent_file2.cpp:2919
int32_t request_set_logstride ( string &  request,
string &  response,
Agent agent 
)
2847 {
2848  double new_logstride;
2849  sscanf(request.c_str(),"set_logstride %lf",&new_logstride);
2850  if(new_logstride > 0. )
2851  {
2852  logstride_sec = new_logstride;
2853  }
2854  return 0;
2855 }
double logstride_sec
Definition: agent_file2.cpp:170
int32_t request_get_logstride ( string &  request,
string &  response,
Agent agent 
)
2858 {
2859  response = "{" + to_json("logstride", logstride_sec) + "}";
2860  return 0;
2861 }
double logstride_sec
Definition: agent_file2.cpp:170
string to_json(string key, string value)
Definition: stringlib.cpp:334
int32_t outgoing_tx_add ( tx_progress tx_out)
2020 {
2021  if (debug_flag)
2022  {
2023  debug_fd_lock.lock();
2024  fprintf(agent->get_debug_fd(), "%16.10f Main: outgoing_tx_add: ", currentmjd());
2025  fflush(agent->get_debug_fd());
2026  debug_fd_lock.unlock();
2027  }
2028 
2029  int32_t node = check_node_id_2(tx_out.node_name);
2030  if (node <0)
2031  {
2032  if (debug_flag)
2033  {
2034  debug_fd_lock.lock();
2035  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_NODE\n");
2036  fflush(agent->get_debug_fd());
2037  debug_fd_lock.unlock();
2038  }
2039  return TRANSFER_ERROR_NODE;
2040  }
2041 
2042  // Only add if we have room
2043  if (txq[static_cast <size_t>(node)].outgoing.size == TRANSFER_QUEUE_LIMIT)
2044  {
2045  if (debug_flag)
2046  {
2047  debug_fd_lock.lock();
2048  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_QUEUEFULL\n");
2049  fflush(agent->get_debug_fd());
2050  debug_fd_lock.unlock();
2051  }
2052  return TRANSFER_ERROR_QUEUEFULL;
2053  }
2054 
2055  if (tx_out.file_name.size())
2056  {
2057  tx_out.filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
2058  }
2059  else
2060  {
2061  if (debug_flag)
2062  {
2063  debug_fd_lock.lock();
2064  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_FILENAME\n");
2065  fflush(agent->get_debug_fd());
2066  debug_fd_lock.unlock();
2067  }
2068  tx_out.filepath = "";
2069  return TRANSFER_ERROR_FILENAME;
2070  }
2071 
2072  tx_out.temppath = data_base_path(tx_out.node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
2073 
2074  // Check for a duplicate file name of something already in queue
2075  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2076  {
2077  if (!txq[static_cast <size_t>(node)].outgoing.progress[i].filepath.empty() && tx_out.filepath == txq[static_cast <size_t>(node)].outgoing.progress[i].filepath)
2078  {
2079  // Remove the META file
2080  if (debug_flag)
2081  {
2082  debug_fd_lock.lock();
2083  fprintf(agent->get_debug_fd(), "%u %s %s %s TRANSFER_ERROR_DUPLICATE\n", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.filepath.c_str());
2084  fflush(agent->get_debug_fd());
2085  debug_fd_lock.unlock();
2086  }
2087  string filepath = tx_out.temppath + ".meta";
2088  remove(filepath.c_str());
2089  return TRANSFER_ERROR_DUPLICATE;
2090  }
2091  }
2092 
2093  tx_out.fp = nullptr;
2094  //get the file size
2095  tx_out.file_size = get_file_size(tx_out.filepath);
2096  tx_out.savetime = 0.;
2097 
2098  // save and queue metadata packet
2099  tx_out.havemeta = true;
2100 
2101  if (debug_flag)
2102  {
2103  debug_fd_lock.lock();
2104  fprintf(agent->get_debug_fd(), "%u %s %s %s %lu ", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.filepath.c_str(), PROGRESS_QUEUE_SIZE);
2105  fflush(agent->get_debug_fd());
2106  debug_fd_lock.unlock();
2107  }
2108 
2109  // Good to go. Add it to queue.
2110  outgoing_tx_lock.lock();
2111  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].tx_id = tx_out.tx_id;
2112  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].havemeta = tx_out.havemeta;
2113  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].complete = tx_out.complete;
2114  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].node_name = tx_out.node_name;
2115  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].agent_name = tx_out.agent_name;
2116  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].file_name = tx_out.file_name;
2117  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].filepath = tx_out.filepath;
2118  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].temppath = tx_out.temppath;
2119  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].savetime = tx_out.savetime;
2120  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].file_size = tx_out.file_size;
2121  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].total_bytes = tx_out.total_bytes;
2122  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].file_info.clear();
2123  for (file_progress filep : tx_out.file_info)
2124  {
2125  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].file_info.push_back(filep);
2126  }
2127  txq[static_cast <size_t>(node)].outgoing.progress[tx_out.tx_id].fp = tx_out.fp;
2128  ++txq[static_cast <size_t>(node)].outgoing.size;
2129  outgoing_tx_lock.unlock();
2130 
2131  if (debug_flag)
2132  {
2133  debug_fd_lock.lock();
2134  fprintf(agent->get_debug_fd(), " %u\n", txq[static_cast <size_t>(node)].outgoing.size);
2135  fflush(agent->get_debug_fd());
2136  debug_fd_lock.unlock();
2137  }
2138 
2139  return outgoing_tx_recount(node);
2140 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
int32_t outgoing_tx_recount(int32_t node)
Definition: agent_file2.cpp:2359
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
static Agent * agent
Definition: agent_file2.cpp:95
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
deque< file_progress > file_info
Definition: transferlib.h:358
#define TRANSFER_ERROR_QUEUEFULL
Definition: cosmos-errno.h:221
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
#define TRANSFER_ERROR_DUPLICATE
Definition: cosmos-errno.h:225
string agent_name
Definition: transferlib.h:350
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
string file_name
Definition: transferlib.h:351
#define TRANSFER_ERROR_FILENAME
Definition: cosmos-errno.h:224
bool complete
Definition: transferlib.h:348
static string node
Definition: agent_monitor.cpp:126
static std::mutex outgoing_tx_lock
Definition: agent_file2.cpp:130
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t outgoing_tx_add ( std::string  node_name,
std::string  agent_name,
std::string  file_name 
)
2143 {
2144  if (node_name.empty() || agent_name.empty() || file_name.empty())
2145  {
2146  if (debug_flag)
2147  {
2148  debug_fd_lock.lock();
2149  fprintf(agent->get_debug_fd(), "%16.10f Main: outgoing_tx_add: TRANSFER_ERROR_FILENAME\n", currentmjd());
2150  fflush(agent->get_debug_fd());
2151  debug_fd_lock.unlock();
2152  }
2153  return TRANSFER_ERROR_FILENAME;
2154  }
2155 
2156  // BEGIN GATHERING THE METADATA
2157  tx_progress tx_out;
2158 
2159  int32_t node = check_node_id_2(node_name);
2160  if (node <0)
2161  {
2162  return TRANSFER_ERROR_NODE;
2163  }
2164 
2165  // Only add if we have room
2166  if (txq[static_cast <size_t>(node)].outgoing.size == TRANSFER_QUEUE_LIMIT)
2167  {
2168  return TRANSFER_ERROR_QUEUEFULL;
2169  }
2170 
2171  // Locate next empty space
2172  //get the file size
2173  outgoing_tx_lock.lock();
2174  tx_out.tx_id = 0;
2175  PACKET_TX_ID_TYPE id = txq[static_cast <size_t>(node)].outgoing.next_id;
2176  do
2177  {
2178  // 0 is special case
2179  if (id == 0)
2180  {
2181  ++id;
2182  }
2183 
2184  if (txq[static_cast <size_t>(node)].outgoing.progress[id].tx_id == 0)
2185  {
2186  tx_out.tx_id = id;
2187  txq[static_cast <size_t>(node)].outgoing.next_id = id + 1;
2188  break;
2189  }
2190  // If no empty found, increment, allowing to wrap if necessary
2191  } while (++id != txq[static_cast <size_t>(node)].outgoing.next_id);
2192  outgoing_tx_lock.unlock();
2193 
2194  if (tx_out.tx_id > 0)
2195  {
2196  tx_out.havemeta = true;
2197  tx_out.complete = false;
2198  tx_out.total_bytes = 0;
2199  tx_out.node_name = node_name;
2200  tx_out.agent_name = agent_name;
2201  tx_out.file_name = file_name;
2202  tx_out.temppath = data_base_path(tx_out.node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
2203  tx_out.filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
2204  tx_out.savetime = 0.;
2205 
2206  std::ifstream filename;
2207 
2208  //get the file size
2209  tx_out.file_size = get_file_size(tx_out.filepath);
2210 
2211  if(tx_out.file_size < 0)
2212  {
2213  if (debug_flag)
2214  {
2215  debug_fd_lock.lock();
2216  fprintf(agent->get_debug_fd(), "%16.10f Main: outgoing_tx_add: DATA_ERROR_SIZE_MISMATCH\n", currentmjd());
2217  fflush(agent->get_debug_fd());
2218  debug_fd_lock.unlock();
2219  }
2220  return DATA_ERROR_SIZE_MISMATCH;
2221  }
2222 
2223  // see if file can be opened
2224  filename.open(tx_out.filepath, std::ios::in|std::ios::binary);
2225  if(!filename.is_open())
2226  {
2227  if (debug_flag)
2228  {
2229  debug_fd_lock.lock();
2230  fprintf(agent->get_debug_fd(), "%16.10f Main: outgoing_tx_add: %s\n", currentmjd(), cosmos_error_string(-errno).c_str());
2231  fflush(agent->get_debug_fd());
2232  debug_fd_lock.unlock();
2233  }
2234  return -errno;
2235  }
2236  filename.close();
2237 
2238  write_meta(tx_out);
2239 
2240  int32_t iretn = outgoing_tx_add(tx_out);
2241  return iretn;
2242  }
2243  else
2244  {
2245  return TRANSFER_ERROR_MATCH;
2246  }
2247 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
Definition: transferlib.h:338
bool havemeta
Definition: transferlib.h:342
string to_string(char *value)
Definition: stringlib.cpp:220
int iretn
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file2.cpp:1648
static Agent * agent
Definition: agent_file2.cpp:95
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
#define TRANSFER_ERROR_QUEUEFULL
Definition: cosmos-errno.h:221
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file2.cpp:2019
string agent_name
Definition: agent_001.cpp:47
string file_name
Definition: transferlib.h:351
#define TRANSFER_ERROR_FILENAME
Definition: cosmos-errno.h:224
bool complete
Definition: transferlib.h:348
static string node
Definition: agent_monitor.cpp:126
static std::mutex outgoing_tx_lock
Definition: agent_file2.cpp:130
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t outgoing_tx_del ( int32_t  node,
uint16_t  tx_id = 256 
)
2250 {
2251  if (node <0 || (uint32_t)node > txq.size())
2252  {
2253  return TRANSFER_ERROR_INDEX;
2254  }
2255 
2256  if (txq[static_cast <size_t>(node)].outgoing.progress[tx_id].tx_id == 0)
2257  {
2258  return TRANSFER_ERROR_MATCH;
2259  }
2260 
2261  if (tx_id >= PROGRESS_QUEUE_SIZE)
2262  {
2263  for (uint16_t i=0; i<PROGRESS_QUEUE_SIZE; ++i)
2264  {
2266  }
2267  }
2268  else
2269  {
2270  tx_progress tx_out = txq[static_cast <size_t>(node)].outgoing.progress[tx_id];
2271 
2272  // erase the transaction
2273  // outgoing_tx.erase(outgoing_tx.begin()+tx_id);
2274  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].tx_id = 0;
2275  if (txq[static_cast <size_t>(node)].outgoing.size)
2276  {
2277  --txq[static_cast <size_t>(node)].outgoing.size;
2278  }
2279 
2280  // Set current tx id back to 0
2281  txq[static_cast <size_t>(node)].outgoing.id = 0;
2282 
2283  // Remove the file
2284  if(remove(tx_out.filepath.c_str()))
2285  {
2286  if (debug_flag)
2287  {
2288  debug_fd_lock.lock();
2289  fprintf(agent->get_debug_fd(), "%16.10f Main/Send: Del outgoing: %u %s %s %s - Unable to remove file\n", currentmjd(), tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
2290  fflush(agent->get_debug_fd());
2291  debug_fd_lock.unlock();
2292  }
2293  }
2294 
2295  // Remove the META file
2296  std::string meta_filepath = tx_out.temppath + ".meta";
2297  remove(meta_filepath.c_str());
2298 
2299  if (debug_flag)
2300  {
2301  debug_fd_lock.lock();
2302  fprintf(agent->get_debug_fd(), "%16.10f Main/Send: Del outgoing: %u %s %s %s\n", currentmjd(), tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
2303  fflush(agent->get_debug_fd());
2304  debug_fd_lock.unlock();
2305  }
2306  }
2307 
2308  return outgoing_tx_recount(node);
2309 }
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int i
Definition: rw_test.cpp:37
int32_t outgoing_tx_recount(int32_t node)
Definition: agent_file2.cpp:2359
Definition: transferlib.h:338
static Agent * agent
Definition: agent_file2.cpp:95
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
int32_t outgoing_tx_del(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2249
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
string agent_name
Definition: transferlib.h:350
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
string file_name
Definition: transferlib.h:351
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t outgoing_tx_purge ( int32_t  node,
uint16_t  tx_id = 256 
)
2312 {
2313  if (node <0 || (uint32_t)node > txq.size())
2314  {
2315  return TRANSFER_ERROR_INDEX;
2316  }
2317 
2318  if (tx_id >= PROGRESS_QUEUE_SIZE)
2319  {
2320  for (uint16_t i=0; i<PROGRESS_QUEUE_SIZE; ++i)
2321  {
2322  txq[static_cast <size_t>(node)].outgoing.progress[i].fp = nullptr;
2323  txq[static_cast <size_t>(node)].outgoing.progress[i].tx_id = 0;
2324  txq[static_cast <size_t>(node)].outgoing.progress[i].complete = false;
2325  txq[static_cast <size_t>(node)].outgoing.progress[i].filepath = "";
2326  txq[static_cast <size_t>(node)].outgoing.progress[i].havemeta = false;
2327  txq[static_cast <size_t>(node)].outgoing.progress[i].savetime = 0;
2328  txq[static_cast <size_t>(node)].outgoing.progress[i].temppath = "";
2329  txq[static_cast <size_t>(node)].outgoing.progress[i].file_name = "";
2330  txq[static_cast <size_t>(node)].outgoing.progress[i].file_size = 0;
2331  txq[static_cast <size_t>(node)].outgoing.progress[i].node_name = "";
2332  txq[static_cast <size_t>(node)].outgoing.progress[i].agent_name = "";
2333  txq[static_cast <size_t>(node)].outgoing.progress[i].total_bytes = 0;
2334  txq[static_cast <size_t>(node)].outgoing.progress[i].file_info.clear();
2335  }
2336  }
2337  else {
2338  {
2339  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].fp = nullptr;
2340  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].tx_id = 0;
2341  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].complete = false;
2342  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].filepath = "";
2343  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].havemeta = false;
2344  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].savetime = 0;
2345  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].temppath = "";
2346  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_name = "";
2347  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_size = 0;
2348  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].node_name = "";
2349  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].agent_name = "";
2350  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes = 0;
2351  txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.clear();
2352  }
2353  }
2354  txq[static_cast <size_t>(node)].outgoing.size = 0;
2355 
2356  return 0;
2357 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
int32_t outgoing_tx_recount ( int32_t  node)
2360 {
2361  if (node <0 || (uint32_t)node > txq.size())
2362  {
2363  return TRANSFER_ERROR_INDEX;
2364  }
2365 
2366  txq[static_cast <size_t>(node)].outgoing.size = 0;
2367  for (uint16_t i=0; i<PROGRESS_QUEUE_SIZE; ++i)
2368  {
2369  if (txq[static_cast <size_t>(node)].outgoing.progress[i].tx_id)
2370  {
2371  ++txq[static_cast <size_t>(node)].outgoing.size;
2372  }
2373  }
2374  return txq[static_cast <size_t>(node)].outgoing.size;
2375 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
int32_t incoming_tx_add ( tx_progress tx_in)
2378 {
2379  int32_t node = check_node_id_2(tx_in.node_name);
2380  if (node <0)
2381  {
2382  if (debug_flag)
2383  {
2384  debug_fd_lock.lock();
2385  fprintf(agent->get_debug_fd(), "%16.10f Main: incoming_tx_add: TRANSFER_ERROR_NODE\n", currentmjd());
2386  fflush(agent->get_debug_fd());
2387  debug_fd_lock.unlock();
2388  }
2389  return TRANSFER_ERROR_NODE;
2390  }
2391 
2392  // Check for an actual file name
2393  if (tx_in.file_name.size())
2394  {
2395  tx_in.filepath = data_base_path(tx_in.node_name, "incoming", tx_in.agent_name, tx_in.file_name);
2396  }
2397  else
2398  {
2399  tx_in.filepath = "";
2400  }
2401 
2402  std::string tx_name = "in_"+std::to_string(tx_in.tx_id);
2403  tx_in.temppath = data_base_path(tx_in.node_name, "temp", "file", tx_name);
2404 
2405  // Check for a duplicate file name of something already in queue
2406  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2407  {
2408  if (!txq[static_cast <size_t>(node)].incoming.progress[i].filepath.empty() && tx_in.filepath == txq[static_cast <size_t>(node)].incoming.progress[i].filepath)
2409  {
2410  if (debug_flag)
2411  {
2412  debug_fd_lock.lock();
2413  fprintf(agent->get_debug_fd(), "%u %s %s %s TRANSFER_ERROR_DUPLICATE\n", tx_in.tx_id, tx_in.node_name.c_str(), tx_in.agent_name.c_str(), tx_in.filepath.c_str());
2414  fflush(agent->get_debug_fd());
2415  debug_fd_lock.unlock();
2416  }
2417  // Remove the META file
2418  string filepath = tx_in.temppath + ".meta";
2419  remove(filepath.c_str());
2420  return TRANSFER_ERROR_DUPLICATE;
2421  }
2422  }
2423 
2424  tx_in.savetime = 0.;
2425  tx_in.fp = nullptr;
2426 
2427  // Put it in list
2428 // txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id] = tx_in;
2429  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].tx_id = tx_in.tx_id;
2430  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].havemeta = tx_in.havemeta;
2431  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].complete = tx_in.complete;
2432  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].node_name = tx_in.node_name;
2433  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].agent_name = tx_in.agent_name;
2434  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].file_name = tx_in.file_name;
2435  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].filepath = tx_in.filepath;
2436  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].temppath = tx_in.temppath;
2437  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].savetime = tx_in.savetime;
2438  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].file_size = tx_in.file_size;
2439  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].total_bytes = tx_in.total_bytes;
2440  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].file_info.clear();
2441  for (file_progress filep : tx_in.file_info)
2442  {
2443  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].file_info.push_back(filep);
2444  }
2445  txq[static_cast <size_t>(node)].incoming.progress[tx_in.tx_id].fp = tx_in.fp;
2446  ++txq[static_cast <size_t>(node)].incoming.size;
2447 
2448  if (debug_flag)
2449  {
2450  debug_fd_lock.lock();
2451  fprintf(agent->get_debug_fd(), "%16.10f Main/Recv: Add incoming: %u %s %s %s %lu\n", currentmjd(), tx_in.tx_id, tx_in.node_name.c_str(), tx_in.agent_name.c_str(), tx_in.filepath.c_str(), PROGRESS_QUEUE_SIZE);
2452  fflush(agent->get_debug_fd());
2453  debug_fd_lock.unlock();
2454  }
2455 
2456  return incoming_tx_recount(node);
2457 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
static Agent * agent
Definition: agent_file2.cpp:95
int32_t incoming_tx_recount(int32_t node)
Definition: agent_file2.cpp:2639
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
deque< file_progress > file_info
Definition: transferlib.h:358
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_DUPLICATE
Definition: cosmos-errno.h:225
string agent_name
Definition: transferlib.h:350
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
string file_name
Definition: transferlib.h:351
bool complete
Definition: transferlib.h:348
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t incoming_tx_add ( std::string  node_name,
PACKET_TX_ID_TYPE  tx_id 
)
2460 {
2461  tx_progress tx_in;
2462 
2463  tx_in.tx_id = tx_id;
2464  tx_in.havemeta = false;
2465  tx_in.complete = false;
2466  tx_in.node_name = node_name;
2467  tx_in.agent_name = "";
2468  tx_in.file_name = "";
2469  tx_in.savetime = 0.;
2470  tx_in.file_size = 0;
2471  tx_in.total_bytes = 0;
2472  tx_in.file_info.clear();
2473 
2474  int32_t iretn = incoming_tx_add(tx_in);
2475 
2476  return iretn;
2477 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:338
bool havemeta
Definition: transferlib.h:342
int iretn
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
double savetime
Definition: transferlib.h:354
string node_name
Definition: agent_001.cpp:46
deque< file_progress > file_info
Definition: transferlib.h:358
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
string file_name
Definition: transferlib.h:351
bool complete
Definition: transferlib.h:348
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file2.cpp:2377
int32_t incoming_tx_update ( packet_struct_metashort  meta)
2480 {
2481  int32_t node = check_node_id_2(meta.node_id);
2482  if (node <0)
2483  {
2484  return TRANSFER_ERROR_NODE;
2485  }
2486 
2487  // See if it's already in the queue
2488  if (txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].tx_id != meta.tx_id)
2489  {
2490  return TRANSFER_ERROR_MATCH;
2491  }
2492 
2493  if (!txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].havemeta)
2494  {
2495  // Core META information
2496  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].node_name = txq[static_cast <size_t>(node)].node_name;
2497  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].agent_name = meta.agent_name;
2498  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].file_name = meta.file_name;
2499  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].file_size = meta.file_size;
2500  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].filepath = data_base_path(txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].node_name, "incoming", txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].agent_name, txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].file_name);
2501  std::string tx_name = "in_"+std::to_string(txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].tx_id);
2502  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].temppath = data_base_path(txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].node_name, "temp", "file", tx_name);
2503 
2504  // Derivative META information
2505  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].savetime = 0.;
2506  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].havemeta = true;
2507  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].total_bytes = 0;
2508  txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].fp = nullptr;
2509 
2510  // Save it to disk
2511  write_meta(txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id]);
2512  }
2513 
2514  if (debug_flag)
2515  {
2516  debug_fd_lock.lock();
2517  fprintf(agent->get_debug_fd(), "%16.10f Recv: Update incoming: %u %s %s %s\n", currentmjd(), txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].tx_id, txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].node_name.c_str(), txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].agent_name.c_str(), txq[static_cast <size_t>(node)].incoming.progress[meta.tx_id].file_name.c_str());
2518  fflush(agent->get_debug_fd());
2519  debug_fd_lock.unlock();
2520  }
2521 
2522  return meta.tx_id;
2523 }
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:248
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:247
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file2.cpp:1648
static Agent * agent
Definition: agent_file2.cpp:95
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
char file_name[128]
Definition: transferlib.h:250
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:249
static string node
Definition: agent_monitor.cpp:126
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:251
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t incoming_tx_del ( int32_t  node,
uint16_t  tx_id = 256 
)
2526 {
2528  if (node <0)
2529  {
2530  return TRANSFER_ERROR_NODE;
2531  }
2532 
2533  if (tx_id >= PROGRESS_QUEUE_SIZE)
2534  {
2535  for (uint16_t i=0; i<PROGRESS_QUEUE_SIZE; ++i)
2536  {
2538  }
2539  }
2540  else
2541  {
2542  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id == 0)
2543  {
2544  return TRANSFER_ERROR_MATCH;
2545  }
2546 
2547  tx_progress tx_in = txq[static_cast <size_t>(node)].incoming.progress[tx_id];
2548 
2549  txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id = 0;
2550  txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta = false;
2551  if (txq[static_cast <size_t>(node)].incoming.size)
2552  {
2553  --txq[static_cast <size_t>(node)].incoming.size;
2554  }
2555 
2556  // Close the DATA file
2557  if (tx_in.fp != nullptr)
2558  {
2559  fclose(tx_in.fp);
2560  tx_in.fp = nullptr;
2561  }
2562 
2563  std::string filepath;
2564  //Remove the DATA file
2565  filepath = tx_in.temppath + ".file";
2566  remove(filepath.c_str());
2567 
2568  // Remove the META file
2569  filepath = tx_in.temppath + ".meta";
2570  remove(filepath.c_str());
2571 
2572  // Make sure we are not using this for incoming_tx_id
2573  if (tx_in.tx_id == txq[static_cast <size_t>(node)].incoming.id)
2574  {
2575  txq[static_cast <size_t>(node)].incoming.id = 0;
2576  }
2577 
2578  if (debug_flag)
2579  {
2580  debug_fd_lock.lock();
2581  fprintf(agent->get_debug_fd(), "%16.10f Recv: Del incoming: %u %s\n", currentmjd(), tx_in.tx_id, tx_in.node_name.c_str());
2582  fflush(agent->get_debug_fd());
2583  debug_fd_lock.unlock();
2584  }
2585  }
2586 
2587  return incoming_tx_recount(node);
2588 
2589 }
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
FILE * fp
Definition: transferlib.h:359
static Agent * agent
Definition: agent_file2.cpp:95
int32_t incoming_tx_recount(int32_t node)
Definition: agent_file2.cpp:2639
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: transferlib.h:349
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
int32_t incoming_tx_del(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2525
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t incoming_tx_purge ( int32_t  node,
uint16_t  tx_id = 256 
)
2592 {
2593  if (node <0 || (uint32_t)node > txq.size())
2594  {
2595  return TRANSFER_ERROR_INDEX;
2596  }
2597 
2598  if (tx_id >= PROGRESS_QUEUE_SIZE)
2599  {
2600  for (uint16_t i=0; i<PROGRESS_QUEUE_SIZE; ++i)
2601  {
2602  txq[static_cast <size_t>(node)].incoming.progress[i].fp = nullptr;
2603  txq[static_cast <size_t>(node)].incoming.progress[i].tx_id = 0;
2604  txq[static_cast <size_t>(node)].incoming.progress[i].complete = false;
2605  txq[static_cast <size_t>(node)].incoming.progress[i].filepath = "";
2606  txq[static_cast <size_t>(node)].incoming.progress[i].havemeta = false;
2607  txq[static_cast <size_t>(node)].incoming.progress[i].savetime = 0;
2608  txq[static_cast <size_t>(node)].incoming.progress[i].temppath = "";
2609  txq[static_cast <size_t>(node)].incoming.progress[i].file_name = "";
2610  txq[static_cast <size_t>(node)].incoming.progress[i].file_size = 0;
2611  txq[static_cast <size_t>(node)].incoming.progress[i].node_name = "";
2612  txq[static_cast <size_t>(node)].incoming.progress[i].agent_name = "";
2613  txq[static_cast <size_t>(node)].incoming.progress[i].total_bytes = 0;
2614  txq[static_cast <size_t>(node)].incoming.progress[i].file_info.clear();
2615  }
2616  }
2617  else {
2618  {
2619  txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp = nullptr;
2620  txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id = 0;
2621  txq[static_cast <size_t>(node)].incoming.progress[tx_id].complete = false;
2622  txq[static_cast <size_t>(node)].incoming.progress[tx_id].filepath = "";
2623  txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta = false;
2624  txq[static_cast <size_t>(node)].incoming.progress[tx_id].savetime = 0;
2625  txq[static_cast <size_t>(node)].incoming.progress[tx_id].temppath = "";
2626  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_name = "";
2627  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_size = 0;
2628  txq[static_cast <size_t>(node)].incoming.progress[tx_id].node_name = "";
2629  txq[static_cast <size_t>(node)].incoming.progress[tx_id].agent_name = "";
2630  txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes = 0;
2631  txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.clear();
2632  }
2633  }
2634  txq[static_cast <size_t>(node)].incoming.size = 0;
2635 
2636  return 0;
2637 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
int32_t incoming_tx_recount ( int32_t  node)
2640 {
2641  if (node <0 || (uint32_t)node > txq.size())
2642  {
2643  return TRANSFER_ERROR_INDEX;
2644  }
2645 
2646  txq[static_cast <size_t>(node)].incoming.size = 0;
2647  for (uint16_t i=0; i<PROGRESS_QUEUE_SIZE; ++i)
2648  {
2649  if (txq[static_cast <size_t>(node)].incoming.progress[i].tx_id)
2650  {
2651  ++txq[static_cast <size_t>(node)].incoming.size;
2652  }
2653  }
2654  return txq[static_cast <size_t>(node)].incoming.size;
2655 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
vector<file_progress> find_chunks_missing ( tx_progress tx)
1815 {
1816  vector<file_progress> missing;
1817  file_progress tp;
1818 
1819  if (tx.file_info.size() == 0)
1820  {
1821  tp.chunk_start = 0;
1822  tp.chunk_end = tx.file_size - 1;
1823  missing.push_back(tp);
1824  }
1825  else
1826  {
1828  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
1829 
1830  // Check missing before first chunk
1831  if (tx.file_info[0].chunk_start)
1832  {
1833  tp.chunk_start = 0;
1834  tp.chunk_end = tx.file_info[0].chunk_start - 1;
1835  missing.push_back(tp);
1836  }
1837 
1838  // Check missing between chunks
1839  for (uint32_t i=1; i<tx.file_info.size(); ++i)
1840  {
1841  if (tx.file_info[i-1].chunk_end+1 != tx.file_info[i].chunk_start)
1842  {
1843  tp.chunk_start = tx.file_info[i-1].chunk_end + 1;
1844  tp.chunk_end = tx.file_info[i].chunk_start - 1;
1845  missing.push_back(tp);
1846  }
1847  }
1848 
1849  // Check missing after last chunk
1850  if (tx.file_info[tx.file_info.size()-1].chunk_end + 1 != tx.file_size)
1851  {
1852  tp.chunk_start = tx.file_info[tx.file_info.size()-1].chunk_end + 1;
1853  tp.chunk_end = tx.file_size - 1;
1854  missing.push_back(tp);
1855  }
1856  }
1857 
1858  // calculate bytes so far
1859  tx.total_bytes = 0;
1860  for (file_progress prog : tx.file_info)
1861  {
1862  tx.total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
1863  }
1864 
1865  return (missing);
1866 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
deque< file_progress > file_info
Definition: transferlib.h:358
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file2.cpp:1775
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file2.cpp:1770
PACKET_FILE_SIZE_TYPE merge_chunks_overlap ( tx_progress tx)
1776 {
1777  switch (tx.file_info.size())
1778  {
1779  case 0:
1780  {
1781  tx.total_bytes = 0;
1782  break;
1783  }
1784  case 1:
1785  {
1786  tx.total_bytes = (tx.file_info[0].chunk_end - tx.file_info[0].chunk_start) + 1;
1787  break;
1788  }
1789  default:
1790  {
1791  tx.total_bytes = 0;
1792  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
1793  for (uint32_t i=0; i<tx.file_info.size(); ++i)
1794  {
1795  for (uint32_t j=i+1; j<tx.file_info.size(); ++j)
1796  {
1797  while (j < tx.file_info.size() && tx.file_info[j].chunk_start <= tx.file_info[i].chunk_end+1)
1798  {
1799  if (tx.file_info[j].chunk_end > tx.file_info[i].chunk_end)
1800  {
1801  tx.file_info[i].chunk_end = tx.file_info[j].chunk_end;
1802  }
1803  tx.file_info.erase(tx.file_info.begin()+j);
1804  }
1805  }
1806  tx.total_bytes += (tx.file_info[i].chunk_end - tx.file_info[i].chunk_start) + 1;
1807  }
1808  break;
1809  }
1810  }
1811  return tx.total_bytes;
1812 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
int i
Definition: rw_test.cpp:37
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file2.cpp:1770
double queuesendto ( PACKET_NODE_ID_TYPE  node_id,
std::string  type,
vector< PACKET_BYTE packet 
)
int32_t mysendto ( std::string  type,
channelstruc channel,
vector< PACKET_BYTE > &  buf 
)
1423 {
1424  int32_t iretn;
1425  double cmjd;
1426 
1427  if ((cmjd = currentmjd(0.)) < channel.nmjd)
1428  {
1429  if (debug_flag)
1430  {
1431  debug_fd_lock.lock();
1432  fprintf(agent->get_debug_fd(), "%16.10f Send: Mysendto_sleep: %f seconds\n", currentmjd(), 86400. * (channel.nmjd - cmjd));
1433  fflush(agent->get_debug_fd());
1434  debug_fd_lock.unlock();
1435  }
1436  COSMOS_SLEEP((86400. * (channel.nmjd - cmjd)));
1437  }
1438 
1439  iretn = sendto(channel.chansock.cudp, reinterpret_cast<const char*>(&buf[0]), buf.size(), 0, reinterpret_cast<sockaddr*>(&channel.chansock.caddr), sizeof(struct sockaddr_in));
1440 
1441  if (iretn >= 0)
1442  {
1443  ++packet_out_count;
1444  channel.nmjd = currentmjd() + ((28+iretn) / (float)channel.throughput)/86400.;
1445  debug_packet(buf, "Send: "+type);
1446  }
1447  else
1448  {
1449  iretn = -errno;
1450  ++send_error_count;
1451  }
1452 
1453  return iretn;
1454 }
static double cmjd
Definition: agent_monitor.cpp:121
void debug_packet(vector< PACKET_BYTE > buf, std::string type)
Definition: agent_file2.cpp:1583
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int iretn
Definition: rw_test.cpp:37
static Agent * agent
Definition: agent_file2.cpp:95
int32_t cudp
Definition: socketlib.h:120
struct sockaddr_in caddr
Definition: socketlib.h:122
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
socket_channel chansock
Definition: agent_file2.cpp:101
char buf[128]
Definition: rw_test.cpp:40
double nmjd
Definition: agent_file2.cpp:105
static uint32_t packet_out_count
Definition: agent_file2.cpp:138
uint32_t throughput
Definition: agent_file2.cpp:104
static uint32_t send_error_count
Definition: agent_file2.cpp:142
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t myrecvfrom ( std::string  type,
socket_channel channel,
vector< PACKET_BYTE > &  buf,
uint32_t  length,
double  dtimeout = 1. 
)
1457 {
1458  int32_t nbytes = 0;
1459 
1460  buf.resize(length);
1461  ElapsedTime et;
1462  do
1463  {
1464  fd_set set;
1465  FD_ZERO(&set);
1466  int fdmax = -1;
1467  for (uint16_t i=0; i<comm_channel.size(); ++i)
1468  {
1469  FD_SET(comm_channel[i].chansock.cudp, &set);
1470  if (comm_channel[i].chansock.cudp > fdmax)
1471  {
1472  fdmax = comm_channel[i].chansock.cudp;
1473  }
1474  }
1475  double rtimeout = dtimeout - et.split();
1476  if (rtimeout >= 0.)
1477  {
1478 #if !defined(COSMOS_WIN_OS)
1479  timeval timeout;
1480  timeout.tv_sec = static_cast<int32_t>(rtimeout);
1481  timeout.tv_usec = static_cast<int32_t>(1000000. * (rtimeout - timeout.tv_sec));
1482  int rv = select(fdmax+1, &set, nullptr, nullptr, &timeout);
1483  if (rv == -1)
1484  {
1485  nbytes = -errno;
1486  }
1487  else if (rv == 0)
1488  {
1489  nbytes = GENERAL_ERROR_TIMEOUT;
1491  }
1492  else
1493  {
1494  for (uint16_t i=0; i<comm_channel.size(); ++i)
1495  {
1496  if (FD_ISSET(comm_channel[i].chansock.cudp, &set))
1497  {
1498  channel = comm_channel[i].chansock;
1499  nbytes = recvfrom(channel.cudp, reinterpret_cast<char *>(&buf[0]), length, 0, reinterpret_cast<sockaddr*>(&channel.caddr), reinterpret_cast<socklen_t *>(&channel.addrlen));
1500  if (nbytes > 0)
1501  {
1502  uint16_t crccalc = calc_crc16ccitt(&buf[3], nbytes-3);
1503  uint16_t crc;
1504  memmove(&crc, &buf[0]+PACKET_HEADER_OFFSET_CRC, sizeof(PACKET_CRC));
1505  if (crc != crccalc)
1506  {
1507  nbytes = GENERAL_ERROR_CRC;
1508  ++crc_error_count;
1509  }
1510  else
1511  {
1512  ++packet_in_count;
1513  buf.resize(nbytes);
1514  debug_packet(buf, "Recv: "+type);
1515  }
1516  return nbytes;
1517  }
1518  else
1519  {
1520  if (nbytes < 0)
1521  {
1522  nbytes = -errno;
1523  ++recv_error_count;
1524  }
1525  else
1526  {
1527  nbytes = GENERAL_ERROR_INPUT;
1528  ++recv_error_count;
1529  }
1530  }
1531  }
1532  }
1533  }
1534 #else
1535  for (uint16_t i=0; i<comm_channel.size(); ++i)
1536  {
1537  channel = comm_channel[i].chansock;
1538  nbytes = recvfrom(channel.cudp, reinterpret_cast<char *>(&buf[0]), length, 0, reinterpret_cast<sockaddr*>(&channel.caddr), reinterpret_cast<socklen_t *>(&channel.addrlen));
1539  if (nbytes > 0)
1540  {
1541  uint16_t crccalc = calc_crc16ccitt(&buf[3], nbytes-3);
1542  uint16_t crc;
1543  memmove(&crc, &buf[0]+PACKET_HEADER_OFFSET_CRC, sizeof(PACKET_CRC));
1544  if (crc != crccalc)
1545  {
1546  nbytes = GENERAL_ERROR_CRC;
1547  ++crc_error_count;
1548  }
1549  else
1550  {
1551  ++packet_in_count;
1552  buf.resize(nbytes);
1553  debug_packet(buf, "Recv: "+type);
1554  }
1555  return nbytes;
1556  }
1557  else
1558  {
1559  if (nbytes < 0)
1560  {
1561  if (errno == EAGAIN || errno == EWOULDBLOCK)
1562  {
1563  COSMOS_SLEEP(.1);
1564  break;
1565  }
1566  nbytes = -errno;
1567  ++recv_error_count;
1568  }
1569  else
1570  {
1571  nbytes = GENERAL_ERROR_INPUT;
1572  ++recv_error_count;
1573  }
1574  }
1575  }
1576 #endif // Not windows
1577  }
1578  } while (et.split() < dtimeout);
1579 
1580  return nbytes;
1581 }
void debug_packet(vector< PACKET_BYTE > buf, std::string type)
Definition: agent_file2.cpp:1583
int i
Definition: rw_test.cpp:37
#define GENERAL_ERROR_TIMEOUT
Definition: cosmos-errno.h:292
static uint32_t crc_error_count
Definition: agent_file2.cpp:139
png_uint_32 crc
Definition: png.c:2173
uint16_t PACKET_CRC
Definition: transferlib.h:142
ElapsedTime et
Definition: agent_cpu_device_test.cpp:51
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
int32_t cudp
Definition: socketlib.h:120
static uint32_t timeout_error_count
Definition: agent_file2.cpp:140
static uint32_t packet_in_count
Definition: agent_file2.cpp:137
#define PACKET_HEADER_OFFSET_CRC
Definition: transferlib.h:150
struct sockaddr_in caddr
Definition: socketlib.h:122
#define GENERAL_ERROR_CRC
Definition: cosmos-errno.h:284
int addrlen
Definition: socketlib.h:128
static uint32_t recv_error_count
Definition: agent_file2.cpp:143
Definition: elapsedtime.h:62
png_uint_32 length
Definition: png.c:2173
char buf[128]
Definition: rw_test.cpp:40
#define GENERAL_ERROR_INPUT
Definition: cosmos-errno.h:293
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
uint16_t calc_crc16ccitt(uint8_t *buf, int size, bool lsb)
Calculate CRC-16-CCITT.
Definition: mathlib.cpp:2206
void debug_packet ( vector< PACKET_BYTE buf,
std::string  type 
)
1584 {
1585  if (debug_flag)
1586  {
1587  uint32_t total = 0;
1588  for (uint16_t i=PACKET_HEADER_OFFSET_TOTAL; i<buf.size(); ++i)
1589  {
1590  total += buf[i];
1591  }
1592  debug_fd_lock.lock();
1593  fprintf(agent->get_debug_fd(), "%16.10f %s Packet: In: %u Out: %u Rerr: %u Serr: %u Cerr: %u Terr: %u Oerr: %u Size: %u Total: %u Channel: %u ", currentmjd(), type.c_str(), packet_in_count, packet_out_count, recv_error_count, send_error_count, crc_error_count, type_error_count, timeout_error_count, buf.size(), total, use_channel);
1594  switch (buf[0] & 0x0f)
1595  {
1596  case PACKET_METADATA:
1597  {
1598  std::string file_name(&buf[PACKET_METASHORT_OFFSET_FILE_NAME], &buf[PACKET_METASHORT_OFFSET_FILE_NAME+TRANSFER_MAX_FILENAME]);
1599  fprintf(agent->get_debug_fd(), "[METADATA] %u %u %s ", buf[PACKET_METASHORT_OFFSET_NODE_ID], buf[PACKET_METASHORT_OFFSET_TX_ID], file_name.c_str());
1600  break;
1601  }
1602  case PACKET_DATA:
1603  {
1605  break;
1606  }
1607  case PACKET_REQDATA:
1608  {
1610  break;
1611  }
1612  case PACKET_REQMETA:
1613  {
1615  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1617  {
1618  fprintf(agent->get_debug_fd(), "%u ", buf[PACKET_REQMETA_OFFSET_TX_ID+i]);
1619  }
1620  break;
1621  }
1622  case PACKET_COMPLETE:
1623  {
1625  break;
1626  }
1627  case PACKET_CANCEL:
1628  {
1630  break;
1631  }
1632  case PACKET_QUEUE:
1633  {
1635  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1637  {
1638  fprintf(agent->get_debug_fd(), "%u ", buf[PACKET_QUEUE_OFFSET_TX_ID+i]);
1639  }
1640  }
1641  }
1642  fprintf(agent->get_debug_fd(), "\n");
1643  fflush(agent->get_debug_fd());
1644  debug_fd_lock.unlock();
1645  }
1646 }
#define PACKET_CANCEL_OFFSET_NODE_ID
Definition: transferlib.h:307
#define PACKET_METASHORT_OFFSET_TX_ID
Definition: transferlib.h:255
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
#define TRANSFER_MAX_FILENAME
Definition: transferlib.h:78
#define PACKET_QUEUE_OFFSET_TX_ID
Definition: transferlib.h:214
#define PACKET_REQDATA_OFFSET_NODE_ID
Definition: transferlib.h:269
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int i
Definition: rw_test.cpp:37
#define PACKET_REQDATA_OFFSET_HOLE_END
Definition: transferlib.h:272
static uint32_t crc_error_count
Definition: agent_file2.cpp:139
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
#define PACKET_DATA_OFFSET_TX_ID
Definition: transferlib.h:285
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
static Agent * agent
Definition: agent_file2.cpp:95
#define PACKET_REQMETA_OFFSET_NODE_NAME
Definition: transferlib.h:225
#define PACKET_COMPLETE_OFFSET_TX_ID
Definition: transferlib.h:298
static uint16_t use_channel
Definition: agent_file2.cpp:96
#define PACKET_METASHORT_OFFSET_NODE_ID
Definition: transferlib.h:254
#define PACKET_CANCEL_OFFSET_TX_ID
Definition: transferlib.h:308
static uint32_t timeout_error_count
Definition: agent_file2.cpp:140
#define PACKET_DATA_OFFSET_BYTE_COUNT
Definition: transferlib.h:286
#define PACKET_REQDATA_OFFSET_TX_ID
Definition: transferlib.h:270
static uint32_t packet_in_count
Definition: agent_file2.cpp:137
#define PACKET_REQMETA_OFFSET_TX_ID
Definition: transferlib.h:226
#define PACKET_HEADER_OFFSET_TOTAL
Definition: transferlib.h:151
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
#define PACKET_DATA_OFFSET_CHUNK_START
Definition: transferlib.h:287
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
#define PACKET_COMPLETE_OFFSET_NODE_ID
Definition: transferlib.h:297
#define PACKET_QUEUE_OFFSET_NODE_NAME
Definition: transferlib.h:213
static uint32_t recv_error_count
Definition: agent_file2.cpp:143
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PACKET_REQDATA_OFFSET_HOLE_START
Definition: transferlib.h:271
#define PACKET_METASHORT_OFFSET_FILE_NAME
Definition: transferlib.h:257
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
#define PACKET_QUEUE_OFFSET_NODE_ID
Definition: transferlib.h:212
#define PACKET_REQMETA_OFFSET_NODE_ID
Definition: transferlib.h:224
char buf[128]
Definition: rw_test.cpp:40
static uint32_t type_error_count
Definition: agent_file2.cpp:141
#define PACKET_DATA_OFFSET_NODE_ID
Definition: transferlib.h:284
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
static uint32_t packet_out_count
Definition: agent_file2.cpp:138
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
static uint32_t send_error_count
Definition: agent_file2.cpp:142
static bool debug_flag
Definition: agent_file2.cpp:72
int32_t write_meta ( tx_progress tx,
double  interval = 5. 
)
1649 {
1650  vector<PACKET_BYTE> packet;
1651  std::ofstream file_name;
1652 
1653  if (currentmjd(0.) - tx.savetime > interval/86400.)
1654  {
1655  tx.savetime = currentmjd(0.);
1656  make_metadata_packet(packet, tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.node_name.c_str(), (char *)tx.agent_name.c_str());
1657  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1658  if(!file_name.is_open())
1659  {
1660  return (-errno);
1661  }
1662 
1663  uint16_t crc;
1664  file_name.write((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1665  crc = slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1666  file_name.write((char *)&crc, 2);
1667  for (file_progress progress_info : tx.file_info)
1668  {
1669  file_name.write((const char *)&progress_info, sizeof(progress_info));
1670  crc = slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info));
1671  file_name.write((char *)&crc, 2);
1672  }
1673  file_name.close();
1674  }
1675 
1676  return 0;
1677 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
deque< file_progress > file_info
Definition: transferlib.h:358
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
int32_t read_meta ( tx_progress tx)
1680 {
1681  vector<PACKET_BYTE> packet(PACKET_METALONG_OFFSET_TOTAL,0);
1682  std::ifstream file_name;
1684 
1685  struct stat statbuf;
1686  if (!stat((tx.temppath + ".meta").c_str(), &statbuf) && statbuf.st_size >= COSMOS_SIZEOF(file_progress))
1687  {
1688  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1689  if(!file_name.is_open())
1690  {
1691  return (-errno);
1692  }
1693  }
1694  else
1695  {
1696  // remove((tx.temppath + ".meta").c_str());
1697  return DATA_ERROR_SIZE_MISMATCH;
1698  }
1699 
1700  tx.fp = nullptr;
1701  tx.savetime = 0.;
1702  tx.complete = false;
1703 
1704 
1705  // load metadata
1706  tx.havemeta = true;
1707 
1708  file_name.read((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1709  if (file_name.eof())
1710  {
1711  return DATA_ERROR_SIZE_MISMATCH;
1712  }
1713  uint16_t crc;
1714  file_name.read((char *)&crc, 2);
1715  if (file_name.eof())
1716  {
1717  return DATA_ERROR_SIZE_MISMATCH;
1718  }
1719  if (crc != slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL))
1720  {
1721  file_name.close();
1722  return DATA_ERROR_CRC;
1723  }
1724  extract_metadata(packet, meta);
1725  tx.tx_id = meta.tx_id;
1726  tx.node_name = meta.node_name;
1727  tx.agent_name = meta.agent_name;
1728  tx.file_name = meta.file_name;
1729  tx.filepath = data_base_path(tx.node_name, "outgoing", tx.agent_name, tx.file_name);
1730  tx.file_size = meta.file_size;
1731 
1732  // load file progress
1733  file_progress progress_info;
1734  do
1735  {
1736  file_name.read((char *)&progress_info, sizeof(progress_info));
1737  if (file_name.eof())
1738  {
1739  break;
1740  }
1741  uint16_t crc;
1742  file_name.read((char *)&crc, 2);
1743  if (file_name.eof())
1744  {
1745  return DATA_ERROR_SIZE_MISMATCH;
1746  }
1747  if (crc != slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info)))
1748  {
1749  file_name.close();
1750  return DATA_ERROR_CRC;
1751  }
1752 
1753  tx.file_info.push_back(progress_info);
1754  } while(!file_name.eof());
1755  file_name.close();
1756  if (debug_flag)
1757  {
1758  debug_fd_lock.lock();
1759  fprintf(agent->get_debug_fd(), "%16.10f Main: read_meta: %s tx_id: %u chunks: %" PRIu32 "\n", currentmjd(), (tx.temppath + ".meta").c_str(), tx.tx_id, tx.file_info.size());
1760  fflush(agent->get_debug_fd());
1761  debug_fd_lock.unlock();
1762  }
1763 
1764  // fix any overlaps and count total bytes
1766 
1767  return 0;
1768 }
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:231
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
char file_name[128]
Definition: transferlib.h:234
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:229
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
#define COSMOS_SIZEOF(element)
Definition: configCosmos.h:139
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
static Agent * agent
Definition: agent_file2.cpp:95
deque< file_progress > file_info
Definition: transferlib.h:358
#define DATA_ERROR_CRC
Definition: cosmos-errno.h:151
string node_name
Definition: transferlib.h:349
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:233
string filepath
Definition: transferlib.h:352
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:235
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file2.cpp:1775
bool complete
Definition: transferlib.h:348
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:232
static bool debug_flag
Definition: agent_file2.cpp:72
bool tx_progress_compare_by_size ( const tx_progress a,
const tx_progress b 
)
2663 {
2664  return a.file_size < b.file_size;
2665 }
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
bool filestruc_compare_by_size ( const filestruc a,
const filestruc b 
)
2658 {
2659  return a.size < b.size;
2660 }
off_t size
Definition: datalib.h:120
PACKET_TX_ID_TYPE check_tx_id ( tx_entry txentry,
PACKET_TX_ID_TYPE  tx_id 
)
2693 {
2694  if (tx_id != 0 && txentry.progress[tx_id].tx_id == tx_id)
2695  {
2696  return tx_id;
2697  }
2698  else
2699  {
2700  return 0;
2701  }
2702 }
std::vector< tx_progress > progress
Definition: agent_file.cpp:145
int32_t check_node_id_2 ( std::string  node_name)
2705 {
2706  int32_t id = -1;
2707  for (uint16_t i=0; i<txq.size(); ++i)
2708  {
2709  if (txq[i].node_name == node_name)
2710  {
2711  id = i;
2712  break;
2713  }
2714  }
2715  return id;
2716 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
int32_t check_node_id_2 ( PACKET_NODE_ID_TYPE  node_id)
2719 {
2720  int32_t id = -1;
2721  if (node_id >= 0 && node_id < txq.size())
2722  {
2723  id = node_id;
2724  }
2725  return id;
2726 }
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
int32_t check_channel ( PACKET_NODE_ID_TYPE  node_id)
2729 {
2730  for(uint16_t i=0; i<comm_channel.size(); ++i)
2731  {
2732  if (comm_channel[i].node == txq[node_id].node_name)
2733  {
2734  return i;
2735  }
2736  }
2737  return -1;
2738 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
static string node
Definition: agent_monitor.cpp:126
uint8_t check_remote_node_id ( PACKET_NODE_ID_TYPE  node_id)
2741 {
2742  int32_t id = -1;
2743  if (node_id >=0 && node_id < txq.size())
2744  {
2745  if (txq[node_id].node_id > 0)
2746  {
2747  id = txq[node_id].node_id - 1;
2748  }
2749  }
2750  return id;
2751 }
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
int32_t set_remote_node_id ( PACKET_NODE_ID_TYPE  node_id,
std::string  node_name 
)
2754 {
2755  int32_t id = -1;
2756  for (uint16_t i=0; i<txq.size(); ++i)
2757  {
2758  if (txq[i].node_name == node_name)
2759  {
2760  txq[i].node_id = node_id+1;
2761  id = i;
2762  }
2763  }
2764  return id;
2765 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
PACKET_TX_ID_TYPE choose_incoming_tx_id ( int32_t  node)
2668 {
2669  PACKET_TX_ID_TYPE tx_id = 0;
2670 
2671  if (node >= 0 && (uint32_t)node < txq.size())
2672  {
2673  // Choose file with least data left to send
2674  PACKET_FILE_SIZE_TYPE nsize = INT32_MAX;
2676  {
2677  // calculate bytes so far
2678  merge_chunks_overlap(txq[static_cast <size_t>(node)].incoming.progress[i]);
2679 
2680  // Choose transactions for which we: have meta and bytes remaining is minimized
2681  if (txq[static_cast <size_t>(node)].incoming.progress[i].tx_id && txq[static_cast <size_t>(node)].incoming.progress[i].havemeta && (txq[static_cast <size_t>(node)].incoming.progress[i].file_size - txq[static_cast <size_t>(node)].incoming.progress[i].total_bytes) < nsize)
2682  {
2683  nsize = txq[static_cast <size_t>(node)].incoming.progress[i].file_size - txq[static_cast <size_t>(node)].incoming.progress[i].total_bytes;
2684  tx_id = txq[static_cast <size_t>(node)].incoming.progress[i].tx_id;
2685  }
2686  }
2687  }
2688 
2689  return tx_id;
2690 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file2.cpp:1775
static string node
Definition: agent_monitor.cpp:126
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
int32_t next_incoming_tx ( PACKET_NODE_ID_TYPE  node)
2768 {
2769  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[static_cast <size_t>(node)].incoming, choose_incoming_tx_id(node));
2770 
2771  if (tx_id < PROGRESS_QUEUE_SIZE && tx_id > 0)
2772  {
2773  // See if we know what the remote node_id is for this
2774  int32_t remote_node = check_remote_node_id(node);
2775  if (remote_node >= 0)
2776  {
2777  // Check if file has been completely received
2778  if(txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_size == txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes && txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
2779  {
2780 // tx_progress tx_in = txq[static_cast <size_t>(node)].incoming.progress[tx_id];
2781  debug_fd_lock.lock();
2782  fprintf(agent->get_debug_fd(), "%16.10f Recv(next_incoming_tx): Complete: %u %s %u %u\n", currentmjd(), txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id, txq[static_cast <size_t>(node)].incoming.progress[tx_id].node_name.c_str(), txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_size, txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes);
2783  fflush(agent->get_debug_fd());
2784  debug_fd_lock.unlock();
2785 
2786  // inform other end that file has been received
2787  vector<PACKET_BYTE> packet;
2788  make_complete_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(remote_node), txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id);
2789  queuesendto(node, "rx", packet);
2790 
2791  // Move file to its final location
2792  if (!txq[static_cast <size_t>(node)].incoming.progress[tx_id].complete)
2793  {
2794  if (txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp != nullptr)
2795  {
2796  fclose(txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
2797  txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp = nullptr;
2798  }
2799  std::string final_filepath = txq[static_cast <size_t>(node)].incoming.progress[tx_id].temppath + ".file";
2800  int32_t iret = rename(final_filepath.c_str(), txq[static_cast <size_t>(node)].incoming.progress[tx_id].filepath.c_str());
2801  // Make sure metadata is recorded
2802  write_meta(txq[static_cast <size_t>(node)].incoming.progress[tx_id], 0.);
2803  if (debug_flag)
2804  {
2805  debug_fd_lock.lock();
2806  fprintf(agent->get_debug_fd(), "%16.10f Recv(next_incoming_tx): Renamed: %d %s\n", currentmjd(), iret, txq[static_cast <size_t>(node)].incoming.progress[tx_id].filepath.c_str());
2807  fflush(agent->get_debug_fd());
2808  debug_fd_lock.unlock();
2809  }
2810  txq[static_cast <size_t>(node)].incoming.progress[tx_id].complete = true;
2811  }
2812  }
2813  else
2814  {
2815  debug_fd_lock.lock();
2816  fprintf(agent->get_debug_fd(), "%16.10f Recv(next_incoming_tx): More: %u %s %u %u\n", currentmjd(), txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id, txq[static_cast <size_t>(node)].incoming.progress[tx_id].node_name.c_str(), txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_size, txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes);
2817  fflush(agent->get_debug_fd());
2818  debug_fd_lock.unlock();
2819  // Ask for missing data
2820  vector<file_progress> missing;
2821  missing = find_chunks_missing(txq[static_cast <size_t>(node)].incoming.progress[tx_id]);
2822  for (uint32_t j=0; j<missing.size(); ++j)
2823  {
2824  vector<PACKET_BYTE> packet;
2825  make_reqdata_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(remote_node), txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id, missing[j].chunk_start, missing[j].chunk_end);
2826  queuesendto(node, "rx", packet);
2827  }
2828  }
2829  }
2830  }
2831  return tx_id;
2832 }
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file2.cpp:1648
static Agent * agent
Definition: agent_file2.cpp:95
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
double queuesendto(PACKET_NODE_ID_TYPE node_id, std::string type, vector< PACKET_BYTE > packet)
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file2.cpp:2692
void make_reqdata_packet(vector< PACKET_BYTE > &packet, packet_struct_reqdata reqdata)
Definition: transferlib.cpp:131
vector< file_progress > find_chunks_missing(tx_progress &tx)
Definition: agent_file2.cpp:1814
static string node
Definition: agent_monitor.cpp:126
int32_t check_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file2.cpp:2740
PACKET_TX_ID_TYPE choose_incoming_tx_id(int32_t node)
Definition: agent_file2.cpp:2667
static bool debug_flag
Definition: agent_file2.cpp:72
std::string json_list_incoming ( )
2884  {
2885  JSONObject jobj;
2886  JSONArray incoming;
2887 
2888  incoming.resize(txq.size());
2889  for (uint16_t node=0; node<txq.size(); ++node)
2890  {
2891 
2892  JSONObject node_obj("node", txq[node].node_name);
2893  node_obj.addElement("count", txq[node].incoming.size);
2894 
2895  JSONArray files;
2896  files.resize(txq[node].incoming.size);
2897  int i =0;
2898  for(tx_progress tx : txq[node].incoming.progress)
2899  {
2900  if (tx.tx_id)
2901  {
2902  JSONObject f("tx_id", tx.tx_id);
2903  f.addElement("agent", tx.agent_name);
2904  f.addElement("name", tx.file_name);
2905  f.addElement("bytes", tx.total_bytes);
2906  f.addElement("size", tx.file_size);
2907  files.at(i) = (JSONValue(f));
2908  i++;
2909  }
2910  }
2911  node_obj.addElement("files", files);
2912  incoming.at(node) = JSONValue(node_obj);
2913 
2914  }
2915  jobj.addElement("incoming", incoming);
2916  return jobj.to_json_string();
2917 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
std::string json_list_outgoing ( )
2919  {
2920  JSONObject jobj;
2921  JSONArray outgoing;
2922 
2923  outgoing.resize(txq.size());
2924  for (uint16_t node=0; node<txq.size(); ++node)
2925  {
2926 
2927  JSONObject node_obj("node", txq[node].node_name);
2928  node_obj.addElement("count", txq[node].outgoing.size);
2929 
2930  JSONArray files;
2931  files.resize(txq[node].outgoing.size);
2932  int i =0;
2933  for(tx_progress tx : txq[node].outgoing.progress)
2934  {
2935  if (tx.tx_id)
2936  {
2937  JSONObject f("tx_id", tx.tx_id);
2938  f.addElement("agent", tx.agent_name);
2939  f.addElement("name", tx.file_name);
2940  f.addElement("bytes", tx.total_bytes);
2941  f.addElement("size", tx.file_size);
2942  files.at(i) = (JSONValue(f));
2943  i++;
2944  }
2945  }
2946  node_obj.addElement("files", files);
2947  outgoing.at(node) = JSONValue(node_obj);
2948 
2949  }
2950  jobj.addElement("outgoing", outgoing);
2951  return jobj.to_json_string();
2952 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
string json_list_queue ( )
2954 {
2955  JSONObject jobj;
2956  JSONArray incoming;
2957  JSONArray outgoing;
2958 
2959  outgoing.resize(txq.size());
2960  incoming.resize(txq.size());
2961  for (uint16_t node=0; node<txq.size(); ++node)
2962  {
2963 
2964  JSONObject node_in("node", txq[node].node_name);
2965  node_in.addElement("count", txq[node].incoming.size);
2966 
2967  JSONArray ifiles;
2968  ifiles.resize(txq[node].incoming.size);
2969  int i =0;
2970  for(tx_progress tx : txq[node].incoming.progress)
2971  {
2972  if (tx.tx_id)
2973  {
2974  JSONObject f("tx_id", tx.tx_id);
2975  f.addElement("agent", tx.agent_name);
2976  f.addElement("name", tx.file_name);
2977  f.addElement("bytes", tx.total_bytes);
2978  f.addElement("size", tx.file_size);
2979  ifiles.at(i) = (JSONValue(f));
2980  i++;
2981  }
2982  }
2983  node_in.addElement("files", ifiles);
2984  incoming.at(node) = JSONValue(node_in);
2985 
2986  JSONObject node_out("node", txq[node].node_name);
2987  node_out.addElement("count", txq[node].incoming.size);
2988  JSONArray files;
2989  files.resize(txq[node].outgoing.size);
2990  for(tx_progress tx : txq[node].outgoing.progress)
2991  {
2992  if (tx.tx_id)
2993  {
2994  JSONObject f("tx_id", tx.tx_id);
2995  f.addElement("agent", tx.agent_name);
2996  f.addElement("name", tx.file_name);
2997  f.addElement("bytes", tx.total_bytes);
2998  f.addElement("size", tx.file_size);
2999  files.at(i) = (JSONValue(f));
3000  i++;
3001  }
3002  }
3003  node_out.addElement("files", files);
3004  outgoing.at(node) = JSONValue(node_out);
3005 
3006  }
3007  jobj.addElement("outgoing", outgoing);
3008  jobj.addElement("incoming", incoming);
3009  return jobj.to_json_string();
3010 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
void write_queue_log ( double  logdate)
2864 {
2865  std::string record = json_list_queue(); // to append to file
2866 
2867  log_write(agent->cinfo->node.name, "file", logdate, "", "log", record, log_directory);
2868 
2869 
2870 }
static Agent * agent
Definition: agent_file2.cpp:95
std::string json_list_queue()
Definition: agent_file2.cpp:2953
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
static std::string log_directory
Definition: agent_file2.cpp:169
char name[40+1]
Node Name.
Definition: jsondef.h:3556
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
cosmosstruc * cinfo
Definition: agentclass.h:346
int main ( int  argc,
char *  argv[] 
)
220 {
221  int32_t iretn;
222 
223  agent = new Agent("", "file", 5.);
224  agent->debug_level = 2;
225 
226  if ((iretn = agent->wait()) < 0)
227  {
228  fprintf(agent->get_debug_fd(), "%16.10f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
229  exit(iretn);
230  }
231  else
232  {
233  fprintf(agent->get_debug_fd(), "%16.10f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
234  }
235 
236  fprintf(agent->get_debug_fd(), "%16.10f Node: %s Agent: %s - Established\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
237  fflush(agent->get_debug_fd()); // Ensure this gets printed before blocking call
238 
239  comm_channel.resize(1);
240  if((iretn = socket_open(&comm_channel[0].chansock, NetworkType::UDP, "", AGENTRECVPORT, SOCKET_LISTEN, SOCKET_BLOCKING, 5000000)) < 0)
241  {
242  fprintf(agent->get_debug_fd(), "%16.10f Main: Node: %s Agent: %s - Listening socket failure\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
243  agent->shutdown();
244  exit (-errno);
245  }
246 
247  inet_ntop(comm_channel[0].chansock.caddr.sin_family, &comm_channel[0].chansock.caddr.sin_addr, comm_channel[0].chansock.address, sizeof(comm_channel[0].chansock.address));
248  comm_channel[0].chanip = comm_channel[0].chansock.address;
249  comm_channel[0].nmjd = currentmjd(0.);
250  comm_channel[0].lmjd = currentmjd(0.);
251  comm_channel[0].node = "";
252  fprintf(agent->get_debug_fd(), "%16.10f Node: %s Agent: %s - Listening socket open\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
253  fflush(agent->get_debug_fd()); // Ensure this gets printed before blocking call
254 
255  switch (argc)
256  {
257  case 2:
258  {
259  comm_channel.resize(2);
260  comm_channel[1].node = argv[1];
261  size_t tloc = comm_channel[1].node.find(":");
262  if (tloc != string::npos)
263  {
264  comm_channel[1].chanip = comm_channel[1].node.substr(tloc+1, comm_channel[1].node.size()-tloc+1);
265  comm_channel[1].node = comm_channel[1].node.substr(0, tloc);
266  }
267  if((iretn = socket_open(&comm_channel[1].chansock, NetworkType::UDP, comm_channel[1].chanip.c_str(), AGENTRECVPORT, SOCKET_TALK, SOCKET_BLOCKING, AGENTRCVTIMEO)) < 0)
268  {
269  fprintf(agent->get_debug_fd(), "%16.10f Node: %s IP: %s - Sending socket failure\n", currentmjd(), comm_channel[1].node.c_str(), comm_channel[1].chanip.c_str());
270  agent->shutdown();
271  exit (-errno);
272  }
273  comm_channel[1].nmjd = currentmjd(0.);
274  fprintf(agent->get_debug_fd(), "%16.10f Network: Old: %u %s %s %u\n", currentmjd(), 1, comm_channel[1].node.c_str(), comm_channel[1].chanip.c_str(), ntohs(comm_channel[1].chansock.caddr.sin_port));
275  fflush(agent->get_debug_fd());
276 
277  log_directory = "outgoing"; // put log files in node/outgoing/file
278  logstride_sec = 60.; // longer logstride
279  break;
280  }
281  }
282 
283  // Restore in progress transfers from previous run
284  for (std::string node_name : data_list_nodes())
285  {
286  int32_t node = check_node_id_2(node_name);
287 
288  if (node < 0)
289  {
290  node = txq.size();
291  tx_queue tx;
292  tx.node_name = node_name;
293  tx.node_id = 0;
295  for (uint16_t i=0; i<7; ++i)
296  {
297  tx.incoming.nmjd[i] = currentmjd();
298  }
299  tx.incoming.id = 0;
300  tx.incoming.next_id = 1;
301  tx.incoming.size = 0;
303  for (uint16_t i=0; i<7; ++i)
304  {
305  tx.outgoing.nmjd[i] = currentmjd();
306  }
307  tx.outgoing.id = 0;
308  tx.outgoing.next_id = 1;
309  tx.outgoing.size = 0;
310  txq.push_back(tx);
311  incoming_tx_purge(node);
312  outgoing_tx_purge(node);
313  }
314 
315  for(filestruc file : data_list_files(node_name, "temp", "file"))
316  {
317  // Add entry for each meta file
318  if (file.type == "meta")
319  {
320  // Incoming
321  if (!file.name.compare(0,3,"in_"))
322  {
323  tx_progress tx_in;
324  tx_in.temppath = file.path.substr(0,file.path.find(".meta"));
325  if (read_meta(tx_in) >= 0)
326  {
327  iretn = incoming_tx_add(tx_in);
328  }
329  }
330 
331  // Outgoing
332  if (!file.name.compare(0,4,"out_"))
333  {
334  tx_progress tx_out;
335  tx_out.temppath = file.path.substr(0,file.path.find(".meta"));
336  if (read_meta(tx_out) >= 0)
337  {
338  iretn = outgoing_tx_add(tx_out);
339  }
340  }
341  }
342  }
343  }
344 
345  // add agent_file requests
346  if ((iretn=agent->add_request("get_channels",request_get_channels,"", "get channel information")))
347  exit (iretn);
348  if ((iretn=agent->add_request("set_throughput",request_set_throughput,"{n} [throughput]", "set channel throughput")))
349  exit (iretn);
350  if ((iretn=agent->add_request("remove_file",request_remove_file,"in|out tx_id", "removes file from indicated queue")))
351  exit (iretn);
352  // if ((iretn=agent->add_request("send_file",request_send_file,"", "creates and sends metadata/data packets")))
353  // exit (iretn);
354  if ((iretn=agent->add_request("ls",request_ls,"", "lists contents of directory")))
355  exit (iretn);
356  if ((iretn=agent->add_request("list_incoming",request_list_incoming,"", "lists contents incoming queue")))
357  exit (iretn);
358  if ((iretn=agent->add_request("list_outgoing",request_list_outgoing,"", "lists contents outgoing queue")))
359  exit (iretn);
360  if ((iretn=agent->add_request("list_incoming_json",request_list_incoming_json,"", "lists contents incoming queue")))
361  exit (iretn);
362  if ((iretn=agent->add_request("list_outgoing_json",request_list_outgoing_json,"", "lists contents outgoing queue")))
363  exit (iretn);
364  if ((iretn=agent->add_request("set_logstride",request_set_logstride,"sec","set time interval of log files")))
365  exit (iretn);
366  if ((iretn=agent->add_request("get_logstride",request_get_logstride,"","get time interval of log files")))
367  exit (iretn);
368  if ((iretn=agent->add_request("debug",request_debug,"{0|1}","Toggle Debug information")))
369  exit (iretn);
370 
371  std::thread send_loop_thread(send_loop);
372  std::thread recv_loop_thread(recv_loop);
373  std::thread transmit_loop_thread(transmit_loop);
374 
375  double nextdiskcheck = currentmjd(0.);
376  double nextlog = currentmjd();
377  double sleepsec;
378  ElapsedTime etloop;
379  etloop.start();
380 
381  // start the agent
382  while(agent->running())
383  {
384  if (agent->running() == (uint16_t)Agent::State::IDLE)
385  {
386  COSMOS_SLEEP(1);
387  continue;
388  }
389 
390 
391  if(nextdiskcheck < nextlog ) {
392  sleepsec = 86400. * (nextdiskcheck - currentmjd());
393  } else {
394  sleepsec = 86400. * (nextlog - currentmjd());
395  }
396  if (sleepsec > 0.)
397  {
398  COSMOS_SLEEP((sleepsec));
399  }
400 
401  if(currentmjd() > nextlog) {
403  nextlog = currentmjd(0.) + logstride_sec/86400.;
404  }
405 
406  // Check for new files to transmit if queue is not full and check is not delayed
407 
408 
409  if (currentmjd() > nextdiskcheck)
410  {
411 
412  nextdiskcheck = currentmjd(0.) + 10./86400.;
413  for (uint16_t node=0; node<txq.size(); ++node)
414  {
415  // Go through outgoing queue, removing files that no longer exist
416  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
417  {
418  if (txq[static_cast <size_t>(node)].outgoing.progress[i].tx_id != 0 && !data_isfile(txq[static_cast <size_t>(node)].outgoing.progress[i].filepath))
419  {
420  outgoing_tx_del(node, txq[static_cast <size_t>(node)].outgoing.progress[i].tx_id);
421  }
422  }
423 
424  // Go through outgoing directories, adding files not already in queue
425  if (txq[static_cast <size_t>(node)].outgoing.size < TRANSFER_QUEUE_LIMIT)
426  {
427  vector<filestruc> file_names;
428  for (filestruc file : data_list_files(txq[static_cast <size_t>(node)].node_name, "outgoing", ""))
429  {
430  if (file.type == "directory")
431  {
432  iretn = data_list_files(txq[static_cast <size_t>(node)].node_name, "outgoing", file.name, file_names);
433  }
434  }
435 
436  // Sort list by size, then go through list of files found, adding to queue.
437  sort(file_names.begin(), file_names.end(), filestruc_compare_by_size);
438  for(uint16_t i=0; i<file_names.size(); ++i)
439  {
440  filestruc file = file_names[i];
441  if (txq[static_cast <size_t>(node)].outgoing.size >= TRANSFER_QUEUE_LIMIT)
442  {
443  break;
444  }
445 
446  //Ignore sub-directories
447  if (file.type == "directory")
448  {
449  continue;
450  }
451 
452  // Ignore zero length files (may still be being formed)
453  if (file.size == 0)
454  {
455  continue;
456  }
457 
458  bool addtoqueue = true;
459  outgoing_tx_lock.lock();
460  for(tx_progress progress : txq[static_cast <size_t>(node)].outgoing.progress)
461  {
462  if (progress.tx_id && file.path == progress.filepath)
463  {
464  addtoqueue = false;
465  break;
466  }
467  }
468 
469  outgoing_tx_lock.unlock();
470 
471  if (addtoqueue)
472  {
473  iretn = outgoing_tx_add(file.node, file.agent, file.name);
474  if (iretn >= 0)
475  {
476  nextdiskcheck = currentmjd();
477  }
478  if (debug_flag)
479  {
480  debug_fd_lock.lock();
481  fprintf(agent->get_debug_fd(), "%16.10f Main: outgoing_tx_add: %s [%d]\n", currentmjd(), file.path.c_str(), iretn);
482  fflush(agent->get_debug_fd());
483  debug_fd_lock.unlock();
484  }
485  }
486  }
487  }
488  }
489  }
490  } // End WHILE Loop
491 
492  fprintf(agent->get_debug_fd(), "%16.10f Main: Node: %s Agent: %s - Exiting\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
493  fflush(agent->get_debug_fd());
494 
495  send_loop_thread.join();
496  recv_loop_thread.join();
497  transmit_queue_check.notify_one();
498  transmit_loop_thread.join();
499  txq.clear();
500 
501  fprintf(agent->get_debug_fd(), "%16.10f Main: Node: %s Agent: %s - Shutting down\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
502  fflush(agent->get_debug_fd());
503 
504  agent->shutdown();
505 
506  exit (0);
507 }
string name
Definition: datalib.h:117
double nmjd[7]
Definition: agent_file2.cpp:151
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
off_t size
Definition: datalib.h:120
void send_loop()
Definition: agent_file2.cpp:1127
Agent socket using Unicast UDP.
void write_queue_log(double logdate)
Definition: agent_file2.cpp:2863
int32_t request_get_channels(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:1960
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
string nodeName
Definition: agentclass.h:367
int i
Definition: rw_test.cpp:37
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
int32_t request_list_incoming_json(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:2872
Definition: transferlib.h:338
int32_t request_remove_file(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:1998
string node
Definition: datalib.h:115
int iretn
Definition: rw_test.cpp:37
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
vector< filestruc > data_list_files(string directory)
Get list of files in a directory, directly.
Definition: datalib.cpp:461
tx_entry outgoing
Definition: agent_file.cpp:156
PACKET_TX_ID_TYPE state
Definition: agent_file2.cpp:147
static Agent * agent
Definition: agent_file2.cpp:95
int32_t request_list_outgoing_json(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:2878
vector< string > data_list_nodes()
Get list of Nodes, directly.
Definition: datalib.cpp:583
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
int32_t request_debug(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:2834
#define SOCKET_TALK
Talk followed by optional listen (sendto address)
Definition: socketlib.h:82
int32_t request_set_logstride(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:2846
PACKET_NODE_ID_TYPE node_id
Definition: agent_file.cpp:154
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
Definition: datalib.h:113
void start()
ElapsedTime::start.
Definition: elapsedtime.cpp:203
int32_t request_list_incoming(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:1900
static std::string log_directory
Definition: agent_file2.cpp:169
double logstride_sec
Definition: agent_file2.cpp:170
int32_t outgoing_tx_del(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2249
#define AGENTRECVPORT
Default RECV port.
Definition: agentclass.h:200
bool data_isfile(string path, off_t size)
Definition: datalib.cpp:1895
string getAgent()
Definition: agentclass.cpp:2609
string path
Definition: datalib.h:119
int32_t add_request(string token, external_request_function function, string synopsis="", string description="")
Add internal request to Agent request list with description and synopsis.
Definition: agentclass.cpp:312
PACKET_TX_ID_TYPE id
Definition: agent_file.cpp:147
PACKET_TX_ID_TYPE size
Definition: agent_file.cpp:146
Definition: agentclass.h:139
PACKET_TX_ID_TYPE next_id
Definition: agent_file.cpp:148
#define SOCKET_BLOCKING
Blocking Agent.
Definition: socketlib.h:78
int32_t shutdown()
Shutdown agent gracefully.
Definition: agentclass.cpp:366
#define AGENTRCVTIMEO
Default AGENT socket RCVTIMEO (100 msec)
Definition: agentclass.h:208
Definition: agent_file.cpp:151
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
std::string node_name
Definition: agent_file.cpp:153
void recv_loop()
Definition: agent_file2.cpp:509
double data_ctime(string path)
Definition: datalib.cpp:1910
Definition: elapsedtime.h:62
int32_t read_meta(tx_progress &tx)
Definition: agent_file2.cpp:1679
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
int32_t request_set_throughput(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:1976
string agent
Definition: datalib.h:116
int32_t request_list_outgoing(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:1930
static std::condition_variable transmit_queue_check
Definition: agent_file2.cpp:120
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file2.cpp:2019
#define SOCKET_LISTEN
Listen followed by optional talk (recvfrom INADDRANY)
Definition: socketlib.h:84
int32_t outgoing_tx_purge(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2311
int32_t request_ls(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:1868
int32_t socket_open(socket_channel *channel, NetworkType ntype, const char *address, uint16_t port, uint16_t role, bool blocking, uint32_t usectimeo, uint32_t rcvbuf, uint32_t sndbuf)
Open UDP socket.
Definition: socketlib.cpp:51
static string node
Definition: agent_monitor.cpp:126
static std::mutex outgoing_tx_lock
Definition: agent_file2.cpp:130
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
void transmit_loop()
Definition: agent_file2.cpp:1361
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
tx_entry incoming
Definition: agent_file.cpp:155
int32_t request_get_logstride(string &request, string &response, Agent *agent)
Definition: agent_file2.cpp:2857
string type
Definition: datalib.h:118
string agentName
Definition: agentclass.h:368
int32_t incoming_tx_purge(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2591
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file2.cpp:2377
static bool debug_flag
Definition: agent_file2.cpp:72
bool filestruc_compare_by_size(const filestruc &a, const filestruc &b)
Definition: agent_file2.cpp:2657
int32_t queuesendto ( PACKET_NODE_ID_TYPE  node_id,
string  type,
vector< PACKET_BYTE packet 
)
1388 {
1389  transmit_queue_entry tentry;
1390 
1391  use_channel = -1;
1392  for (uint16_t i=0; i<comm_channel.size(); ++i)
1393  {
1394  if (txq[node_id].node_name == comm_channel[i].node)
1395  {
1396  use_channel = i;
1397  break;
1398  }
1399  }
1400 
1401  if (use_channel > comm_channel.size())
1402  {
1403  return -1.;
1404  }
1405 
1406  tentry.type = type;
1407  tentry.channel = use_channel;
1408  tentry.packet = packet;
1409  transmit_queue.push(tentry);
1410  transmit_queue_check.notify_one();
1411  double time_step = packet.size() / (86400. * comm_channel[use_channel].throughput);
1412  if (time_step > 0)
1413  {
1414  return time_step;
1415  }
1416  else
1417  {
1418  return 0.;
1419  }
1420 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
int i
Definition: rw_test.cpp:37
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
static uint16_t use_channel
Definition: agent_file2.cpp:96
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file2.cpp:119
static std::condition_variable transmit_queue_check
Definition: agent_file2.cpp:120
Definition: agent_file.cpp:110
static string node
Definition: agent_monitor.cpp:126
bool lower_chunk ( file_progress  i,
file_progress  j 
)
1771 {
1772  return (i.chunk_start<j.chunk_start);
1773 }
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334

Variable Documentation

bool debug_flag = true
static
beatstruc cbeat
static

the (global) name of the heartbeat structure

Agent* agent
static

the (global) name of the cosmos data structure

uint16_t use_channel = 0
static
vector<channelstruc> comm_channel
static
std::queue<transmit_queue_entry> transmit_queue
static
std::condition_variable transmit_queue_check
static
std::mutex incoming_tx_lock
static
std::mutex outgoing_tx_lock
static
std::mutex debug_fd_lock
static
double last_data_receive_time = 0.
static
double next_reqmeta_time = 0.
static
uint32_t packet_in_count = 0
static
uint32_t packet_out_count
static
uint32_t crc_error_count = 0
static
uint32_t timeout_error_count = 0
static
uint32_t type_error_count = 0
static
uint32_t send_error_count = 0
static
uint32_t recv_error_count = 0
static
vector<tx_queue> txq
static
std::string log_directory = "incoming"
static
double logstride_sec = 10.