COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_file3.cpp File Reference
#include "support/configCosmos.h"
#include "agent/agentclass.h"
#include "support/jsonlib.h"
#include "support/transferlib.h"
#include "support/sliplib.h"
#include "support/jsonobject.h"
#include <algorithm>
#include <cstring>
#include <time.h>
#include <iostream>
#include <fstream>
#include <string>
#include <sys/stat.h>
#include <sys/select.h>
Include dependency graph for agent_file3.cpp:

Classes

struct  channelstruc
 
struct  transmit_queue_entry
 
struct  tx_entry
 
struct  tx_queue
 

Macros

#define PROGRESS_QUEUE_SIZE   256
 
#define PACKET_SIZE_LO   (200-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
 
#define THROUGHPUT_LO   130
 
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define THROUGHPUT_HI   700
 
#define PACKET_IN   1
 
#define PACKET_OUT   2
 

Functions

void send_loop ()
 
void recv_loop ()
 
void transmit_loop ()
 
int32_t request_debug (string &request, string &, Agent *)
 
int32_t request_get_channels (string &request, string &response, Agent *agent)
 
int32_t request_set_throughput (string &request, string &response, Agent *agent)
 
int32_t request_remove_file (string &request, string &response, Agent *agent)
 
int32_t request_ls (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming_json (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing_json (string &request, string &response, Agent *agent)
 
int32_t request_set_logstride (string &request, string &response, Agent *agent)
 
int32_t request_get_logstride (string &, string &response, Agent *)
 
int32_t outgoing_tx_add (tx_progress &tx_out)
 
int32_t outgoing_tx_add (string node_name, string agent_name, string file_name)
 
int32_t outgoing_tx_del (uint8_t node, uint16_t tx_id=256)
 
int32_t outgoing_tx_purge (uint8_t node, uint16_t tx_id=256)
 
int32_t outgoing_tx_recount (uint8_t node)
 
int32_t outgoing_tx_load (uint8_t node)
 
int32_t incoming_tx_add (tx_progress &tx_in)
 
int32_t incoming_tx_add (string node_name, PACKET_TX_ID_TYPE tx_id)
 
int32_t incoming_tx_update (packet_struct_metashort meta)
 
int32_t incoming_tx_del (uint8_t node, uint16_t tx_id=256)
 
int32_t incoming_tx_purge (uint8_t node, uint16_t tx_id=256)
 
int32_t incoming_tx_recount (uint8_t node)
 
vector< file_progressfind_chunks_missing (tx_progress &tx)
 
vector< file_progressfind_chunks_togo (tx_progress &tx)
 
PACKET_FILE_SIZE_TYPE merge_chunks_overlap (tx_progress &tx)
 
double queuecheck (PACKET_NODE_ID_TYPE node_id)
 
int32_t queuesendto (PACKET_NODE_ID_TYPE node_id, string type, vector< PACKET_BYTE > packet)
 
int32_t mysendto (string type, int32_t use_channel, vector< PACKET_BYTE > &buf)
 
int32_t myrecvfrom (string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
 
void debug_packet (vector< PACKET_BYTE > buf, uint8_t direction, string type, int32_t use_channel)
 
int32_t write_meta (tx_progress &tx, double interval=5.)
 
int32_t read_meta (tx_progress &tx)
 
bool tx_progress_compare_by_size (const tx_progress &a, const tx_progress &b)
 
bool filestruc_compare_by_size (const filestruc &a, const filestruc &b)
 
PACKET_TX_ID_TYPE check_tx_id (tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
 
int32_t check_channel (PACKET_NODE_ID_TYPE node_id)
 
int32_t add_node_name (string node_name)
 
uint8_t check_local_node_id (PACKET_NODE_ID_TYPE local_id)
 
uint8_t lookup_local_node_id (PACKET_NODE_ID_TYPE remote_id)
 
uint8_t lookup_local_node_id (string node_name)
 
uint8_t check_remote_node_id (PACKET_NODE_ID_TYPE remote_id)
 
uint8_t lookup_remote_node_id (PACKET_NODE_ID_TYPE local_id)
 
uint8_t lookup_remote_node_id (string node_name)
 
uint8_t set_remote_node_id (PACKET_NODE_ID_TYPE node_id, string node_name)
 
string get_remote_node_name (PACKET_NODE_ID_TYPE node_id)
 
PACKET_TX_ID_TYPE choose_incoming_tx_id (uint8_t node)
 
PACKET_TX_ID_TYPE choose_outgoing_tx_id (uint8_t node)
 
int32_t next_incoming_tx (PACKET_NODE_ID_TYPE node, int32_t use_channel)
 
string json_list_incoming ()
 
string json_list_outgoing ()
 
string json_list_queue ()
 
void write_queue_log (double logdate)
 
int main (int argc, char *argv[])
 
bool lower_chunk (file_progress i, file_progress j)
 

Variables

static beatstruc cbeat
 
static Agentagent
 
static PACKET_CHUNK_SIZE_TYPE default_packet_size = (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
static uint32_t default_throughput = 700
 
static vector< channelstrucout_comm_channel
 
static std::queue< transmit_queue_entrytransmit_queue
 
static std::mutex incoming_tx_lock
 
static std::mutex outgoing_tx_lock
 
static std::mutex debug_fd_lock
 
static double last_data_receive_time = 0.
 
static double next_reqmeta_time = 0.
 
static uint32_t packet_in_count = 0
 
static uint32_t packet_out_count
 
static uint32_t crc_error_count = 0
 
static uint32_t timeout_error_count = 0
 
static uint32_t type_error_count = 0
 
static uint32_t send_error_count = 0
 
static uint32_t recv_error_count = 0
 
static vector< tx_queuetxq
 
static string log_directory = "temp"
 
double logstride_sec = 10.
 
ElapsedTime dt
 

Macro Definition Documentation

#define PROGRESS_QUEUE_SIZE   256
#define PACKET_SIZE_LO   (200-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
#define THROUGHPUT_LO   130
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define THROUGHPUT_HI   700
#define PACKET_IN   1
#define PACKET_OUT   2

Function Documentation

void send_loop ( )
1057 {
1058  vector<PACKET_BYTE> packet;
1059  int32_t use_channel=-1;
1060 
1061  while (agent->running())
1062  {
1063 
1064  if (agent->running() == (uint16_t)Agent::State::IDLE)
1065  {
1066  COSMOS_SLEEP(1);
1067  continue;
1068  }
1069 
1070  // Sleep just a bit to let other threads act
1071  COSMOS_SLEEP(.001);
1072 
1073  for (uint8_t local_node=1; local_node<txq.size(); ++local_node)
1074  {
1075  // See if we have an active channel serving this Node
1076  int32_t channel = check_channel(local_node);
1077  if (channel < 0)
1078  {
1079  continue;
1080  }
1081 
1082  // Set up to do the most important things first
1083  txq[(local_node)].outgoing.activity = false;
1084  txq[(local_node)].incoming.activity = false;
1085 
1086  // Initialize timer to check minimum wait time
1087  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) >= 5.)
1088  {
1089  COSMOS_SLEEP(queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) - 5.);
1090  }
1091 
1092  // Send Cancel, Complete, Data, Reqdata, Metadata only if we have learned what the remote node mapping is
1093  if (txq[local_node].remote_id > 0)
1094  {
1095  // Send any pending Metadata packets
1096  outgoing_tx_lock.lock();
1097  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5.)
1098  {
1099  txq[(local_node)].outgoing.metaclock = currentmjd();
1100  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1101  {
1102  if (txq[(local_node)].outgoing.progress[tx_id].tx_id == tx_id && txq[(local_node)].outgoing.progress[tx_id].sendmeta)
1103  {
1104  tx_progress tx = txq[(local_node)].outgoing.progress[tx_id];
1105  vector<PACKET_BYTE> packet;
1106  make_metadata_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.agent_name.c_str());
1107  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Outgoing", packet);
1108  if (use_channel >= 0)
1109  {
1110  txq[(local_node)].outgoing.activity = true;
1111  txq[(local_node)].outgoing.progress[tx_id].sendmeta = false;
1112  txq[(local_node)].outgoing.progress[tx_id].havemeta = true;
1113  txq[(local_node)].outgoing.progress[tx_id].sentmeta = true;
1114  txq[(local_node)].outgoing.metaclock += out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1115  txq[(local_node)].outgoing.queueclock = txq[(local_node)].outgoing.metaclock + out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1116  txq[(local_node)].outgoing.heartbeatclock = currentmjd() + 4. / 86400.;
1117  break;
1118  }
1119  }
1120  }
1121  }
1122  outgoing_tx_lock.unlock();
1123 
1124  // Send Data packets if we have data to send and we didn't do anything above
1125  outgoing_tx_lock.lock();
1126  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5.)
1127  {
1128  txq[(local_node)].outgoing.dataclock = currentmjd();
1129  uint16_t tx_id = choose_outgoing_tx_id((local_node));
1130  if (txq[(local_node)].outgoing.progress[tx_id].tx_id == tx_id && txq[(local_node)].outgoing.progress[tx_id].senddata)
1131  {
1132  if (txq[(local_node)].outgoing.progress[tx_id].file_size)
1133  {
1134  // Check if there is any more data to send
1135  if (txq[(local_node)].outgoing.progress[tx_id].total_bytes == 0)
1136  {
1137  txq[(local_node)].outgoing.progress[tx_id].sentdata = true;
1138  txq[(local_node)].outgoing.progress[tx_id].senddata = false;
1139  }
1140  else
1141  {
1142  // Attempt to open the outgoing progress file
1143  if (txq[(local_node)].outgoing.progress[tx_id].fp == nullptr)
1144  {
1145  txq[(local_node)].outgoing.progress[tx_id].fp = fopen(txq[(local_node)].outgoing.progress[tx_id].filepath.c_str(), "r");
1146  }
1147 
1148  // If we're good, continue with the process
1149  if(txq[(local_node)].outgoing.progress[tx_id].fp != nullptr)
1150  {
1151  file_progress tp;
1152  tp = txq[(local_node)].outgoing.progress[tx_id].file_info[0];
1153 
1154  PACKET_FILE_SIZE_TYPE byte_count = (tp.chunk_end - tp.chunk_start) + 1;
1155  if (byte_count > out_comm_channel[use_channel].packet_size)
1156  {
1157  byte_count = out_comm_channel[use_channel].packet_size;
1158  }
1159 
1160  tp.chunk_end = tp.chunk_start + byte_count - 1;
1161 
1162  // Read the packet and send it
1163  int32_t nbytes;
1164  PACKET_BYTE* chunk = new PACKET_BYTE[byte_count]();
1165  if (!(nbytes = fseek(txq[(local_node)].outgoing.progress[tx_id].fp, tp.chunk_start, SEEK_SET)))
1166  {
1167  nbytes = fread(chunk, 1, byte_count, txq[(local_node)].outgoing.progress[tx_id].fp);
1168  }
1169  if (nbytes == byte_count)
1170  {
1171  make_data_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), txq[(local_node)].outgoing.progress[tx_id].tx_id, byte_count, tp.chunk_start, chunk);
1172  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Outgoing", packet);
1173  if (use_channel >= 0)
1174  {
1175  txq[(local_node)].outgoing.progress[tx_id].file_info[0].chunk_start = tp.chunk_end + 1;
1176  txq[(local_node)].outgoing.activity = true;
1177  txq[(local_node)].outgoing.dataclock += out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1178  txq[(local_node)].outgoing.metaclock = txq[(local_node)].outgoing.dataclock + out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1179  txq[(local_node)].outgoing.queueclock = txq[(local_node)].outgoing.metaclock + out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1180  txq[(local_node)].outgoing.heartbeatclock = currentmjd() + 4. / 86400.;
1181  }
1182  }
1183  else
1184  {
1185  // Some problem with this transmission, ask other end to dequeue it
1186  // Remove transaction
1187  txq[(local_node)].outgoing.progress[tx_id].senddata = false;
1188  txq[(local_node)].outgoing.progress[tx_id].complete = true;
1189  }
1190  delete[] chunk;
1191 
1192  if (txq[(local_node)].outgoing.progress[tx_id].file_info[0].chunk_start > txq[(local_node)].outgoing.progress[tx_id].file_info[0].chunk_end)
1193  {
1194  // All done with this file_info entry. Close file and remove entry.
1195  fclose(txq[(local_node)].outgoing.progress[tx_id].fp);
1196  txq[(local_node)].outgoing.progress[tx_id].fp = nullptr;
1197  txq[(local_node)].outgoing.progress[tx_id].file_info.pop_front();
1198  }
1199 
1200  write_meta(txq[(local_node)].outgoing.progress[tx_id]);
1201  }
1202  else
1203  {
1204  // Some problem with this transmission, ask other end to dequeue it
1205 
1206  txq[(local_node)].outgoing.progress[tx_id].senddata = false;
1207  txq[(local_node)].outgoing.progress[tx_id].complete = true;
1208  }
1209  }
1210  }
1211  // Zero length file, ask other end to dequeue it
1212  else
1213  {
1214  txq[(local_node)].outgoing.progress[tx_id].senddata = false;
1215  txq[(local_node)].outgoing.progress[tx_id].complete = true;
1216  }
1217  }
1218  }
1219  outgoing_tx_lock.unlock();
1220 
1221  // Send Cancel packets if required
1222  outgoing_tx_lock.lock();
1223  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5.)
1224  {
1225  txq[(local_node)].outgoing.completeclock = currentmjd();
1226  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1227  {
1228  if (txq[(local_node)].outgoing.progress[tx_id].tx_id == tx_id && txq[(local_node)].outgoing.progress[tx_id].complete)
1229  {
1230  // Remove from queue
1231  outgoing_tx_del(local_node, tx_id);
1232 
1233  // Send a CANCEL packet
1234  vector<PACKET_BYTE> packet;
1235  make_cancel_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), tx_id);
1236  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Outgoing", packet);
1237  if (use_channel >= 0)
1238  {
1239  txq[(local_node)].outgoing.activity = true;
1240  txq[(local_node)].outgoing.completeclock += out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1241  txq[(local_node)].outgoing.dataclock = txq[(local_node)].outgoing.completeclock + out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1242  txq[(local_node)].outgoing.metaclock = txq[(local_node)].outgoing.dataclock + out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1243  txq[(local_node)].outgoing.queueclock = txq[(local_node)].outgoing.metaclock + out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1244  txq[(local_node)].outgoing.heartbeatclock = currentmjd() + 4. / 86400.;
1245  }
1246  }
1247  }
1248  }
1249  outgoing_tx_lock.unlock();
1250 
1251  // Send Complete packets if required
1252  incoming_tx_lock.lock();
1253  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5.)
1254  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1255  {
1256  if (txq[(local_node)].incoming.progress[tx_id].tx_id == tx_id && txq[(local_node)].incoming.progress[tx_id].complete)
1257  {
1258  // Remove from queue
1259  incoming_tx_del(local_node, tx_id);
1260 
1261  // Send a COMPLETE packet
1262  vector<PACKET_BYTE> packet;
1263  make_complete_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), tx_id);
1264  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Incoming", packet);
1265  if (use_channel >= 0)
1266  {
1267  txq[(local_node)].incoming.activity = true;
1268  txq[(local_node)].incoming.completeclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1269  txq[(local_node)].incoming.dataclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1270  txq[(local_node)].incoming.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1271  txq[(local_node)].incoming.queueclock = currentmjd() + 10. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1272  }
1273  }
1274  }
1275  incoming_tx_lock.unlock();
1276 
1277  // Send Reqdata packet if there is still data to be gotten and it has been otherwise quiet
1278  incoming_tx_lock.lock();
1279  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5.)
1280  {
1281  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[(local_node)].incoming, choose_incoming_tx_id(local_node));
1282 
1283  if (tx_id < PROGRESS_QUEUE_SIZE && tx_id > 0)
1284  {
1285  // Ask for missing data
1286  vector<file_progress> missing;
1287  missing = find_chunks_missing(txq[(local_node)].incoming.progress[tx_id]);
1288  for (uint32_t j=0; j<missing.size(); ++j)
1289  {
1290  vector<PACKET_BYTE> packet;
1291  make_reqdata_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), txq[(local_node)].incoming.progress[tx_id].tx_id, missing[j].chunk_start, missing[j].chunk_end);
1292  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Incoming", packet);
1293  out_comm_channel[channel].send_reqdata = false;
1294  txq[(local_node)].incoming.dataclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1295  txq[(local_node)].incoming.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1296  txq[(local_node)].incoming.queueclock = currentmjd() + 10. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1297  }
1298  }
1299  }
1300  incoming_tx_lock.unlock();
1301  }
1302 
1303  // Send Reqmeta packet if requested
1304  incoming_tx_lock.lock();
1305  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5.)
1306  {
1307  vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1308  PACKET_TX_ID_TYPE iq = 0;
1309  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1310  {
1311  if (txq[(local_node)].incoming.progress[tx_id].tx_id && txq[(local_node)].incoming.progress[tx_id].sendmeta)
1312  {
1313  next_reqmeta_time += sizeof(packet_struct_metashort) / (86400. * out_comm_channel[use_channel].throughput);
1314  tqueue[iq++] = tx_id;
1315  txq[(local_node)].incoming.progress[tx_id].sendmeta = false;
1316  }
1317  if (iq == TRANSFER_QUEUE_LIMIT)
1318  {
1319  break;
1320  }
1321  }
1322  if (iq)
1323  {
1324  vector<PACKET_BYTE> packet;
1325  make_reqmeta_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), txq[(local_node)].node_name, tqueue);
1326  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Incoming", packet);
1327  if (use_channel >= 0)
1328  {
1329  txq[(local_node)].incoming.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1330  txq[(local_node)].incoming.queueclock = currentmjd() + 10. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1331  }
1332  }
1333  }
1334  incoming_tx_lock.unlock();
1335 
1336  // Send Queue packet, if anything needs to be queued
1337  outgoing_tx_lock.lock();
1338  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5.)
1339  {
1340  txq[(local_node)].outgoing.queueclock = currentmjd();
1341  vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1342  PACKET_TX_ID_TYPE iq = 0;
1343  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1344  {
1345  if (txq[(local_node)].outgoing.progress[tx_id].tx_id == tx_id)
1346  {
1347  tqueue[iq++] = txq[(local_node)].outgoing.progress[tx_id].tx_id;
1348 // txq[(local_node)].outgoing.progress[tx_id].sendmeta = true;
1349  }
1350  if (iq == TRANSFER_QUEUE_LIMIT)
1351  {
1352  break;
1353  }
1354  }
1355  if (iq)
1356  {
1357  make_queue_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), txq[(local_node)].node_name, tqueue);
1358  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Outgoing", packet);
1359  if (use_channel >= 0)
1360  {
1361  txq[(local_node)].outgoing.activity = true;
1362  out_comm_channel[channel].send_queue = false;
1363  txq[(local_node)].outgoing.queueclock += out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1364  }
1365  }
1366  }
1367  outgoing_tx_lock.unlock();
1368 
1369  // Send Reqqueue packet if requested
1370 // incoming_tx_lock.lock();
1371 // if (txq[(local_node)].incoming.queueclock < currentmjd())
1372 // if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5. && txq[(local_node)].incoming.queueclock < currentmjd())
1373 // {
1374 // make_reqqueue_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), txq[(local_node)].node_name);
1375 // use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Incoming", packet);
1376 // if (use_channel >= 0)
1377 // {
1378 // txq[(local_node)].incoming.activity = true;
1379 // txq[(local_node)].incoming.queueclock = currentmjd() + 5. / 86400.;
1380 // }
1381 // }
1382 // incoming_tx_lock.unlock();
1383 
1384  // Send Heartbeat every 4 seconds, regardless
1385 // if (txq[(local_node)].outgoing.heartbeatclock < currentmjd())
1386  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(local_node)) < 5. && txq[(local_node)].outgoing.heartbeatclock < currentmjd())
1387  {
1388  uint32_t funixtime = utc2unixseconds(out_comm_channel[channel].fmjd);
1389  make_heartbeat_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(local_node), txq[(local_node)].node_name, 4, out_comm_channel[channel].throughput, funixtime);
1390  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(local_node), "Outgoing", packet);
1391  txq[(local_node)].outgoing.heartbeatclock = currentmjd() + 5. / 86400.;
1392  }
1393  }
1394  }
1395 }
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2872
vector< file_progress > find_chunks_missing(tx_progress &tx)
Definition: agent_file3.cpp:1990
static Agent * agent
Definition: agent_file3.cpp:94
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file3.cpp:1823
void make_data_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, PACKET_TX_ID_TYPE tx_id, PACKET_CHUNK_SIZE_TYPE byte_count, PACKET_FILE_SIZE_TYPE chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:232
Definition: transferlib.h:332
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
static std::mutex incoming_tx_lock
Definition: agent_file3.cpp:140
Definition: transferlib.h:245
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
double utc2unixseconds(double utc)
UTC to Unix time.
Definition: timelib.cpp:167
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
int32_t queuesendto(PACKET_NODE_ID_TYPE node_id, string type, vector< PACKET_BYTE > packet)
Definition: agent_file3.cpp:1451
void make_cancel_packet(vector< PACKET_BYTE > &packet, packet_struct_cancel cancel)
Definition: transferlib.cpp:82
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2478
void make_heartbeat_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, PACKET_BYTE beat_period, PACKET_UNIXTIME_TYPE throughput, PACKET_UNIXTIME_TYPE funixtime)
Definition: transferlib.cpp:299
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
static std::mutex outgoing_tx_lock
Definition: agent_file3.cpp:141
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
PACKET_TX_ID_TYPE choose_outgoing_tx_id(uint8_t node)
Definition: agent_file3.cpp:3041
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
#define SEEK_SET
Definition: zconf.h:475
void make_reqdata_packet(vector< PACKET_BYTE > &packet, packet_struct_reqdata reqdata)
Definition: transferlib.cpp:131
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
string file_name
Definition: transferlib.h:351
uint8_t PACKET_BYTE
Definition: transferlib.h:140
int32_t check_channel(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file3.cpp:3098
void make_queue_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > queue)
Definition: transferlib.cpp:277
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file3.cpp:3086
double queuecheck(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file3.cpp:1429
void make_reqmeta_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > reqmeta)
Definition: transferlib.cpp:110
PACKET_TX_ID_TYPE choose_incoming_tx_id(uint8_t node)
Definition: agent_file3.cpp:3011
static double next_reqmeta_time
Definition: agent_file3.cpp:145
void recv_loop ( )
480 {
481  vector<PACKET_BYTE> recvbuf;
482  string partial_filepath;
483  int32_t nbytes = 0;
484  socket_channel rchannel;
485  std::vector<channelstruc>::size_type use_channel = 0;
486 
487  while (agent->running())
488  {
489  if (agent->running() == (uint16_t)Agent::State::IDLE)
490  {
491  COSMOS_SLEEP(1);
492  continue;
493  }
494  else
495  {
496  COSMOS_SLEEP(.001);
497  }
498 
499  while (( nbytes = myrecvfrom("Incoming", rchannel, recvbuf, PACKET_MAX_LENGTH)) > 0)
500  {
501  // Generate extra network info
502  inet_ntop(rchannel.caddr.sin_family, &rchannel.caddr.sin_addr, rchannel.address, sizeof(rchannel.address));
503 
504  // Check channels, update information if we are already handling it, otherwise add channel
505  string node_name = "";
506  uint8_t local_node = 0;
507  uint8_t remote_node = recvbuf[PACKET_HEADER_OFFSET_NODE_ID];
508  switch (recvbuf[0] & 0x0f)
509  {
510  case PACKET_HEARTBEAT:
511  case PACKET_REQQUEUE:
512  case PACKET_QUEUE:
513  case PACKET_REQMETA:
514  node_name = reinterpret_cast<char *>(&recvbuf[PACKET_HEADER_OFFSET_NODE_NAME]);
515  local_node = lookup_local_node_id(node_name);
516  if (local_node == 0)
517  {
518  local_node = add_node_name(node_name);
519  }
520  txq[local_node].remote_id = remote_node;
521  break;
522  case PACKET_METADATA:
523  case PACKET_REQDATA:
524  case PACKET_DATA:
525  case PACKET_COMPLETE:
526  case PACKET_CANCEL:
527  local_node = lookup_local_node_id(remote_node);
528  if (local_node > 0)
529  {
530  node_name = txq[local_node].node_name;
531  }
532  break;
533  }
534 
535  if (local_node == 0 || node_name.empty())
536  {
537  continue;
538  }
539 
540  use_channel = out_comm_channel.size();
541  for (std::vector<channelstruc>::size_type i=0; i<out_comm_channel.size(); ++i)
542  {
543  // Are we handling this Node?
544  if (out_comm_channel[i].node == node_name)
545  {
546  out_comm_channel[i].chansock = rchannel;
547  out_comm_channel[i].chanip = out_comm_channel[i].chansock.address;
548  use_channel = i;
549  break;
550  }
551  }
552 
553  if (use_channel == out_comm_channel.size())
554  {
555  channelstruc tchannel;
556  tchannel.node = node_name;
557  tchannel.nmjd = currentmjd(0.);
558  tchannel.limjd = tchannel.nmjd;
559  tchannel.lomjd = tchannel.nmjd;
560  tchannel.fmjd = tchannel.nmjd;
561  tchannel.chansock = rchannel;
562  inet_ntop(tchannel.chansock.caddr.sin_family, &tchannel.chansock.caddr.sin_addr, tchannel.chansock.address, sizeof(tchannel.chansock.address));
563  tchannel.chanip = tchannel.chansock.address;
564  use_channel = out_comm_channel.size();
565  out_comm_channel.push_back(tchannel);
566  }
567 
569 
570  // Respond appropriately according to type of packet
571  switch (recvbuf[0] & 0x0f)
572  {
573  case PACKET_HEARTBEAT:
574  {
575  packet_struct_heartbeat heartbeat;
576 
577  extract_heartbeat(recvbuf, heartbeat);
578 
579  out_comm_channel[use_channel].heartbeat = heartbeat;
580  }
581  break;
582  case PACKET_REQQUEUE:
583  {
584  packet_struct_reqqueue reqqueue;
585  txq[local_node].outgoing.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
586 
587  extract_reqqueue(recvbuf, reqqueue);
588 
589  out_comm_channel[use_channel].send_queue = true;
590  }
591  break;
592  case PACKET_QUEUE:
593  {
594  packet_struct_queue queue;
595  txq[local_node].incoming.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
596 
597  extract_queue(recvbuf, queue);
598 
599  incoming_tx_lock.lock();
600 
601  // Sort through incoming queue and remove anything not in sent queue
602  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
603  {
604  bool valid = false;
605  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
606  {
607  if (queue.tx_id[i] && txq[local_node].incoming.progress[tx_id].tx_id == queue.tx_id[i])
608  {
609  // This is an incoming file we should handle
610  valid = true;
611  break;
612  }
613  }
614 
615  if (txq[local_node].incoming.progress[tx_id].tx_id && !valid)
616  {
617  // This is not an incoming file we should handle
618  incoming_tx_del(local_node, tx_id);
619  }
620  }
621 
622  // Sort through remotely sent queue and add anything not in our local incoming queue
623  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
624  {
625  if (queue.tx_id[i])
626  {
627  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[local_node].incoming, queue.tx_id[i]);
628 
629  if (tx_id == 0)
630  {
631  incoming_tx_add(queue.node_name, queue.tx_id[i]);
632  }
633  }
634  }
635 
636  incoming_tx_lock.unlock();
637  }
638  break;
639  //Request missing metadata
640  case PACKET_REQMETA:
641  {
642  packet_struct_reqmeta reqmeta;
643  txq[local_node].outgoing.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
644  txq[local_node].outgoing.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
645 
646  extract_reqmeta(recvbuf, reqmeta);
647 
648  outgoing_tx_lock.lock();
649 
650  // Send requested META packets
651  if (txq[local_node].remote_id > 0)
652  {
653  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
654  {
655  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[local_node].outgoing, reqmeta.tx_id[i]);
656  if (tx_id > 0)
657  {
658  txq[local_node].outgoing.progress[tx_id].sendmeta = true;
659  txq[local_node].outgoing.progress[tx_id].sentmeta = false;
660  }
661  }
662  }
663  outgoing_tx_lock.unlock();
664  break;
665  }
666  case PACKET_METADATA:
667  {
669  txq[local_node].incoming.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
670  txq[local_node].incoming.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
671 
672  extract_metadata(recvbuf, meta);
673 
674  incoming_tx_lock.lock();
675 
676  incoming_tx_update(meta);
677 
678  incoming_tx_lock.unlock();
679 
680  break;
681  }
682  case PACKET_REQDATA:
683  {
684  packet_struct_reqdata reqdata;
685  txq[local_node].outgoing.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
686  txq[local_node].outgoing.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
687  txq[local_node].outgoing.dataclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
688 
689  extract_reqdata(recvbuf, reqdata);
690 
691  // Simple validity check
692  if (reqdata.hole_end < reqdata.hole_start)
693  {
694  break;
695  }
696 
697  outgoing_tx_lock.lock();
698 
699  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[local_node].outgoing, reqdata.tx_id);
700  // tx_id now points to the valid entry to which we should add the data
701 
702  if (tx_id > 0)
703  {
704  // Add this chunk to the queue
705  file_progress tp;
706  tp.chunk_start = reqdata.hole_start;
707  tp.chunk_end = reqdata.hole_end;
708  PACKET_FILE_SIZE_TYPE byte_count = (reqdata.hole_end - reqdata.hole_start) + 1;
709 
710  uint32_t check=0;
711  // Anything in the queue yet?
712  if (!txq[local_node].outgoing.progress[tx_id].file_info.size())
713  {
714  // Add first entry
715  txq[local_node].outgoing.progress[tx_id].file_info.push_back(tp);
716  txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
717  }
718  else
719  {
720  // Check against existing data
721  for (uint32_t j=0; j<txq[local_node].outgoing.progress[tx_id].file_info.size(); ++j)
722  {
723  // If we match this entry
724  if (tp.chunk_start == txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start && tp.chunk_end == txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end)
725  {
726  break;
727  }
728  // If we start before this entry
729  if (tp.chunk_start < txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start)
730  {
731  // If we end before this entry (at least one byte between), insert
732  if (tp.chunk_end + 1 < txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start)
733  {
734  txq[local_node].outgoing.progress[tx_id].file_info.insert(txq[local_node].outgoing.progress[tx_id].file_info.begin()+j, tp);
735  txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
736  break;
737  }
738  // Otherwise, extend the near end
739  else
740  {
741  tp.chunk_end = txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
742  txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
743  byte_count = (tp.chunk_end - tp.chunk_start) + 1;
744  txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
745  break;
746  }
747  }
748  else
749  {
750  // If we overlap on the end, extend the far end
751  if (tp.chunk_start <= txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
752  {
753  if (tp.chunk_end > txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end)
754  {
755  byte_count = tp.chunk_end - txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end;
756  tp.chunk_start = txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
757  txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
758  txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
759  break;
760  }
761  }
762  }
763  check = j + 1;
764  }
765 
766 
767  // If we are higher than everything currently in the list, then append
768  if (check == txq[local_node].outgoing.progress[tx_id].file_info.size())
769  {
770  txq[local_node].outgoing.progress[tx_id].file_info.push_back(tp);
771  txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
772  }
773 
774  }
775 
776  // Save meta to disk
777  write_meta(txq[local_node].outgoing.progress[tx_id]);
778  txq[local_node].outgoing.progress[tx_id].senddata = true;
779  txq[local_node].outgoing.progress[tx_id].sentdata = false;
780  txq[local_node].outgoing.progress[tx_id].sentmeta = true;
781  txq[local_node].outgoing.progress[tx_id].complete = false;
782  }
783 
784  outgoing_tx_lock.unlock();
785  break;
786  }
787  case PACKET_DATA:
788  {
789  packet_struct_data data;
790  txq[local_node].incoming.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
791  txq[local_node].incoming.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
792  txq[local_node].incoming.dataclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
793 
794  extract_data(recvbuf, data.node_id, data.tx_id, data.byte_count, data.chunk_start, data.chunk);
795 
797 
798  // create transaction entry if new, and then add data
799 
800  incoming_tx_lock.lock();
801 
802  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[local_node].incoming, data.tx_id);
803 
804  // Update corresponding incoming queue entry if it exists
805  if (tx_id > 0)
806  {
807 
808  // tx_id now points to the valid entry to which we should add the data
809  file_progress tp;
810  tp.chunk_start = data.chunk_start;
811  tp.chunk_end = data.chunk_start + data.byte_count - 1;
812 
813  packet_struct_data odata;
814  odata = data;
815 
816  uint32_t check=0;
817  bool duplicate = false;
818  bool updated = false;
819 
820  // Do we have any data yet?
821  if (!txq[local_node].incoming.progress[tx_id].file_info.size())
822  {
823  // Add first entry, then write data
824  txq[local_node].incoming.progress[tx_id].file_info.push_back(tp);
825  txq[local_node].incoming.progress[tx_id].total_bytes += data.byte_count;
826  updated = true;
827  }
828  else
829  {
830  // Check against existing data
831  for (uint32_t j=0; j<txq[local_node].incoming.progress[tx_id].file_info.size(); ++j)
832  {
833  // Check for duplicate
834  if (tp.chunk_start >= txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start && tp.chunk_end <= txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end)
835  {
836  duplicate = true;
837  break;
838  }
839  // If we start before this entry
840  if (tp.chunk_start < txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start)
841  {
842  // If we end before this entry (at least one byte between), insert
843  if (tp.chunk_end + 1 < txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start)
844  {
845  txq[local_node].incoming.progress[tx_id].file_info.insert(txq[local_node].incoming.progress[tx_id].file_info.begin()+j, tp);
846  txq[local_node].incoming.progress[tx_id].total_bytes += data.byte_count;
847  updated = true;
848  break;
849  }
850  // Otherwise, extend the near end
851  else
852  {
853  tp.chunk_end = txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start - 1;
854  txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
855  data.byte_count = (tp.chunk_end - tp.chunk_start) + 1;
856  txq[local_node].incoming.progress[tx_id].total_bytes += data.byte_count;
857  updated = true;
858  break;
859  }
860  }
861  else
862  {
863  // If we overlap on the end, extend the far end
864  if (tp.chunk_start <= txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end + 1)
865  {
866  if (tp.chunk_end > txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end)
867  {
868  data.byte_count = tp.chunk_end - txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end;
869  tp.chunk_start = txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end + 1;
870  txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
871  txq[local_node].incoming.progress[tx_id].total_bytes += data.byte_count;
872  updated = true;
873  break;
874  }
875  }
876  }
877  check = j + 1;
878  }
879 
880 
881  // If we are higher than everything currently in the list, then append
882  if (!duplicate && check == txq[local_node].incoming.progress[tx_id].file_info.size())
883  {
884  txq[local_node].incoming.progress[tx_id].file_info.push_back(tp);
885  txq[local_node].incoming.progress[tx_id].total_bytes += data.byte_count;
886  updated = true;
887  }
888 
889  }
890 
891  // Write to disk if this is new data
892  if (updated)
893  {
894  // Write incoming data to disk
895  if (txq[local_node].incoming.progress[tx_id].fp == nullptr)
896  {
897  partial_filepath = txq[local_node].incoming.progress[tx_id].temppath + ".file";
898  if (data_exists(partial_filepath))
899  {
900  txq[local_node].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), "r+");
901  }
902  else
903  {
904  txq[local_node].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), "w");
905  }
906  }
907 
908  if (txq[local_node].incoming.progress[tx_id].fp == nullptr)
909  {
910  if (agent->debug_level)
911  {
912  debug_fd_lock.lock();
913  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming: File Error: %s %s on ID: %u Chunk: %u\n", currentmjd(), dt.lap(), partial_filepath.c_str(), cosmos_error_string(-errno).c_str(), tx_id, tp.chunk_start);
914  fflush(agent->get_debug_fd());
915  debug_fd_lock.unlock();
916  }
917  }
918  else
919  {
920  fseek(txq[local_node].incoming.progress[tx_id].fp, tp.chunk_start, SEEK_SET);
921  fwrite(data.chunk, data.byte_count, 1, txq[local_node].incoming.progress[tx_id].fp);
922  fflush(txq[local_node].incoming.progress[tx_id].fp);
923  // Write latest meta data to disk
924  write_meta(txq[local_node].incoming.progress[tx_id]);
925  if (agent->debug_level)
926  {
927  uint32_t total = 0;
928  for (uint16_t i=0; i<data.byte_count; ++i)
929  {
930  total += data.chunk[i];
931  }
932  }
933  }
934 
935  }
936 
937  // Check if file has been completely received
938  if(txq[local_node].incoming.progress[tx_id].file_size == txq[local_node].incoming.progress[tx_id].total_bytes && txq[local_node].incoming.progress[tx_id].havemeta)
939  {
940  // See if we know what the remote node_id is for this
941  if (txq[local_node].remote_id > 0)
942  {
943  tx_progress tx_in = txq[local_node].incoming.progress[tx_id];
944  if (agent->debug_level)
945  {
946  debug_fd_lock.lock();
947  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming: Complete: %u %s %u %u\n", currentmjd(), dt.lap(), tx_in.tx_id, tx_in.node_name.c_str(), tx_in.file_size, tx_in.total_bytes);
948  fflush(agent->get_debug_fd());
949  debug_fd_lock.unlock();
950  }
951 
952  // Move file to its final location
953  if (!txq[local_node].incoming.progress[tx_id].complete)
954  {
955  if (txq[local_node].incoming.progress[tx_id].fp != nullptr)
956  {
957  fclose(txq[local_node].incoming.progress[tx_id].fp);
958  txq[local_node].incoming.progress[tx_id].fp = nullptr;
959  }
960  string final_filepath = tx_in.temppath + ".file";
961  int iret = rename(final_filepath.c_str(), tx_in.filepath.c_str());
962  // Make sure metadata is recorded
963  write_meta(txq[local_node].incoming.progress[tx_id], 0.);
964  if (agent->debug_level)
965  {
966  debug_fd_lock.lock();
967  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming: Renamed/Data: %d %s\n", currentmjd(), dt.lap(), iret, tx_in.filepath.c_str());
968  fflush(agent->get_debug_fd());
969  debug_fd_lock.unlock();
970  }
971 
972  // Mark complete
973  txq[local_node].incoming.progress[tx_id].complete = true;
974  txq[local_node].incoming.progress[tx_id].senddata = false;
975  txq[local_node].incoming.progress[tx_id].sentdata = true;
976  out_comm_channel[use_channel].send_complete = true;
977  }
978  }
979  }
980  }
981  else
982  {
983  txq[local_node].incoming.progress[data.tx_id].sendmeta = true;
984  }
985 
986  incoming_tx_lock.unlock();
987 
988  break;
989  }
990  case PACKET_COMPLETE:
991  {
992  packet_struct_complete complete;
993  txq[local_node].outgoing.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
994  txq[local_node].outgoing.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
995  txq[local_node].outgoing.dataclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
996  txq[local_node].outgoing.completeclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
997 
998  extract_complete(recvbuf, complete);
999 
1000  outgoing_tx_lock.lock();
1001 
1002  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[local_node].outgoing, complete.tx_id);
1003 
1004  if (tx_id > 0)
1005  {
1006  txq[local_node].outgoing.progress[tx_id].complete = true;
1007  }
1008 
1009  outgoing_tx_lock.unlock();
1010  break;
1011  }
1012  case PACKET_CANCEL:
1013  {
1014  packet_struct_complete cancel;
1015  txq[local_node].incoming.queueclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1016  txq[local_node].incoming.metaclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1017  txq[local_node].incoming.dataclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1018  txq[local_node].incoming.completeclock = currentmjd() + 2. * out_comm_channel[use_channel].packet_size / (86400. * out_comm_channel[use_channel].throughput);
1019 
1020  extract_complete(recvbuf, cancel);
1021 
1022  incoming_tx_lock.lock();
1023 
1024  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[local_node].incoming, cancel.tx_id);
1025 
1026  if (tx_id > 0)
1027  {
1028  // Remove the transaction
1029  incoming_tx_del(local_node, tx_id);
1030  }
1031 
1032  incoming_tx_lock.unlock();
1033  break;
1034  }
1035  default:
1036  ++type_error_count;
1037  break;
1038  }
1039  }
1040  }
1041 
1042  // Flush any active metadata
1043  for (uint16_t node=0; node<txq.size(); ++node)
1044  {
1045  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1046  {
1047  if (txq[(node)].incoming.progress[tx_id].tx_id && txq[(node)].incoming.progress[tx_id].havemeta)
1048  {
1049  write_meta(txq[(node)].incoming.progress[tx_id], 0.);
1050  }
1051  }
1052  }
1053 
1054 }
string node
Definition: agent_file2.cpp:100
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2872
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
Definition: transferlib.h:205
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
static Agent * agent
Definition: agent_file3.cpp:94
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file3.cpp:2812
int32_t myrecvfrom(string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
Definition: agent_file3.cpp:1521
Definition: transferlib.h:291
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file3.cpp:1823
ElapsedTime dt
Definition: agent_file3.cpp:183
double limjd
Definition: agent_file3.cpp:103
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file3.cpp:2693
Definition: transferlib.h:332
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string chanip
Definition: agent_file2.cpp:102
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
Definition: transferlib.h:155
static std::mutex incoming_tx_lock
Definition: agent_file3.cpp:140
Definition: transferlib.h:245
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
char address[17]
Definition: socketlib.h:134
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
Definition: agent_file2.cpp:98
Definition: transferlib.h:217
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
int32_t add_node_name(string node_name)
Definition: agent_file3.cpp:3110
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
double lomjd
Definition: agent_file3.cpp:104
string filepath
Definition: transferlib.h:352
Definition: socketlib.h:115
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
struct sockaddr_in caddr
Definition: socketlib.h:122
Definition: transferlib.h:195
Definition: transferlib.h:261
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
static uint32_t type_error_count
Definition: agent_file3.cpp:152
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
void extract_heartbeat(vector< PACKET_BYTE > &packet, packet_struct_heartbeat &heartbeat)
Definition: transferlib.cpp:316
static std::mutex outgoing_tx_lock
Definition: agent_file3.cpp:141
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
#define PACKET_HEADER_OFFSET_NODE_NAME
Definition: transferlib.h:153
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
#define SEEK_SET
Definition: zconf.h:475
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
Definition: transferlib.h:275
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:221
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
socket_channel chansock
Definition: agent_file2.cpp:101
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
static double last_data_receive_time
Definition: agent_file3.cpp:144
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
double nmjd
Definition: agent_file2.cpp:105
static const unsigned char PACKET_HEARTBEAT
Definition: transferlib.h:111
static string node
Definition: agent_monitor.cpp:126
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file3.cpp:3086
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
double fmjd
Definition: agent_file3.cpp:106
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
static const unsigned char PACKET_REQQUEUE
Definition: transferlib.h:110
#define PACKET_HEADER_OFFSET_NODE_ID
Definition: transferlib.h:152
bool data_exists(string &path)
Check existence of path.
Definition: datalib.cpp:1003
void extract_reqqueue(vector< PACKET_BYTE > &packet, packet_struct_reqqueue &reqqueue)
Definition: transferlib.cpp:271
void transmit_loop ( )
1398 {
1399  int32_t iretn;
1400  // std::mutex transmit_queue_lock;
1401  // std::unique_lock<std::mutex> locker(transmit_queue_lock);
1402 
1403  while (agent->running())
1404  {
1405  if (agent->running() == (uint16_t)Agent::State::IDLE)
1406  {
1407  COSMOS_SLEEP(1);
1408  continue;
1409  }
1410 
1411 
1412  // transmit_queue_check.wait(locker);
1413 
1414  while (!transmit_queue.empty())
1415  {
1416  // Get next packet from transceiver FIFO
1417  transmit_queue_entry entry = transmit_queue.front();
1418  iretn = mysendto(entry.type, entry.channel, entry.packet);
1419  if (iretn >= 0)
1420  {
1421  out_comm_channel[entry.channel].fmjd = transmit_queue.back().nmjd;
1422  transmit_queue.pop();
1423  }
1424  }
1425  COSMOS_SLEEP(.001);
1426  }
1427 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
static Agent * agent
Definition: agent_file3.cpp:94
int iretn
Definition: rw_test.cpp:37
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t mysendto(string type, int32_t use_channel, vector< PACKET_BYTE > &buf)
Definition: agent_file3.cpp:1490
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
Definition: agent_file.cpp:110
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file3.cpp:130
int32_t request_debug ( string &  request,
string &  ,
Agent  
)
3320 {
3321 
3322  string requestString = string(request);
3323  StringParser sp(requestString, ' ');
3324 
3325  agent->debug_level = sp.getFieldNumberAsDouble(2); // should be getFieldNumberAsBoolean
3326 
3327  std::cout << "agent->debug_level: " << agent->debug_level << std::endl;
3328  return 0;
3329 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
static Agent * agent
Definition: agent_file3.cpp:94
Definition: stringlib.h:89
int32_t request_get_channels ( string &  request,
string &  response,
Agent agent 
)
2168 {
2169  for (uint16_t channel=0; channel<out_comm_channel.size(); ++channel)
2170  {
2171  response = "{";
2172  response += to_json("channel", channel);
2173  response += to_json("node", out_comm_channel[channel].node);
2174  response += to_json("ip", out_comm_channel[channel].chanip);
2175  response += to_json("size", out_comm_channel[channel].packet_size);
2176  response += to_json("throughput", out_comm_channel[channel].throughput);
2177  response += to_json("nmjd", out_comm_channel[channel].nmjd);
2178  response += to_json("limjd", out_comm_channel[channel].limjd);
2179  response += to_json("lomjd", out_comm_channel[channel].lomjd);
2180  response += to_json("fmjd", out_comm_channel[channel].fmjd);
2181  response += "}";
2182  }
2183  return 0;
2184 }
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
static string node
Definition: agent_monitor.cpp:126
string to_json(string key, string value)
Definition: stringlib.cpp:334
int32_t request_set_throughput ( string &  request,
string &  response,
Agent agent 
)
2187 {
2188  uint16_t channel=0;
2189  uint32_t throughput=0;
2190 
2191  sscanf(request.c_str(), "%*s %hu %u\n", &channel, &throughput);
2192  if (channel < out_comm_channel.size())
2193  {
2194  if (throughput)
2195  {
2196  out_comm_channel[channel].throughput = throughput;
2197  }
2198  }
2199  else
2200  {
2201  response = "Channel " + to_unsigned(channel) + " too large";
2202  }
2203  return 0;
2204 
2205 }
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
int32_t request_remove_file ( string &  request,
string &  response,
Agent agent 
)
2208 {
2209  char type;
2210  uint32_t tx_id;
2211 
2212  sscanf(request.c_str(), "%*s %c %u\n", &type, &tx_id);
2213  switch (type)
2214  {
2215  case 'i':
2216  {
2217  break;
2218  }
2219  case 'o':
2220  {
2221  break;
2222  }
2223  }
2224 
2225  return 0;
2226 }
int32_t request_ls ( string &  request,
string &  response,
Agent agent 
)
2088 {
2089 
2090  //the request string == "ls directoryname"
2091  //get the directory name
2092  // char directoryname[COSMOS_MAX_NAME+1];
2093  // memmove(directoryname, request.substr(3), COSMOS_MAX_NAME);
2094  string directoryname = request.substr(3);
2095 
2096  DIR* dir;
2097  struct dirent* ent;
2098 
2099  string all_file_names;
2100 
2101  if((dir = opendir(directoryname.c_str())) != nullptr)
2102  {
2103  while (( ent = readdir(dir)) != nullptr)
2104  {
2105  all_file_names += ent->d_name;
2106  all_file_names += "\n";
2107  }
2108  closedir(dir);
2109 
2110  (response = all_file_names.c_str());
2111  }
2112  else
2113  response = "unable to open directory " + directoryname;
2114  return 0;
2115 }
int closedir(DIR *dir)
Definition: dirent.c:74
Definition: dirent.c:24
Definition: dirent.h:21
DIR * opendir(const char *name)
Definition: dirent.c:32
struct dirent * readdir(DIR *dir)
Definition: dirent.c:97
char * d_name
Definition: dirent.h:23
int32_t request_list_incoming ( string &  request,
string &  response,
Agent agent 
)
2118 {
2119  response.clear();
2120  for (uint16_t node = 0; node<txq.size(); ++node)
2121  {
2122  response += to_unsigned(node) + ' ' + txq[(node)].node_name + ' ' + to_signed(txq[(node)].incoming.size) + "\n";
2123  for(tx_progress tx : txq[(node)].incoming.progress)
2124  {
2125  if (tx.tx_id)
2126  {
2127  response += to_label("tx_id", tx.tx_id);
2128  response += to_label("node", tx.node_name);
2129  response += to_label("agent", tx.agent_name);
2130  response += to_label("name", tx.file_name);
2131  response += to_label("bytes", tx.total_bytes);
2132  response += "/" + to_unsigned(tx.file_size);
2133  response += to_label("havemeta", tx.havemeta);
2134  response += to_label("sendmeta", tx.sendmeta);
2135  response += to_label("sentmeta", tx.sentmeta);
2136  response += to_label("senddata", tx.senddata);
2137  response += to_label("sentdata", tx.sentdata);
2138  response += to_label("complete", tx.complete);
2139  response += "\n";
2140  }
2141  }
2142  }
2143 
2144  return 0;
2145 }
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
string node_name
Definition: agent_001.cpp:46
string to_signed(int64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:245
static string node
Definition: agent_monitor.cpp:126
string to_label(string label, double value, uint16_t precision, bool mjd)
Definition: stringlib.cpp:376
int32_t request_list_outgoing ( string &  request,
string &  response,
Agent agent 
)
2148 {
2149  response.clear();
2150  for (uint16_t node=0; node<txq.size(); ++node)
2151  {
2152  response += std::to_string(node)+ ' ' + txq[(node)].node_name + ' ' + std::to_string(txq[(node)].outgoing.size) + "\n";
2153  for(tx_progress tx : txq[(node)].outgoing.progress)
2154  {
2155  if (tx.tx_id)
2156  {
2157  response += "tx_id: " + std::to_string(tx.tx_id) + " node: " + tx.node_name + " agent: " + tx.agent_name + " name: " + tx.file_name + " bytes: " + std::to_string(tx.total_bytes) + "/" + std::to_string(tx.file_size);
2158  response += " havemeta: " + to_bool(tx.havemeta) + " sendmeta: " + to_bool(tx.sendmeta) + " sentmeta: " + to_bool(tx.sentmeta);
2159  response += " senddata: " + to_bool(tx.senddata) + " sentdata: " + to_bool(tx.sentdata) + " complete: " + to_bool(tx.complete) + "\n";
2160  }
2161  }
2162  }
2163 
2164  return 0;
2165 }
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
string node_name
Definition: agent_001.cpp:46
string to_bool(bool value)
Definition: stringlib.cpp:326
static string node
Definition: agent_monitor.cpp:126
int32_t request_list_incoming_json ( string &  request,
string &  response,
Agent agent 
)
3359 {
3360  (response = json_list_incoming().c_str());
3361  return 0;
3362 }
string json_list_incoming()
Definition: agent_file3.cpp:3370
int32_t request_list_outgoing_json ( string &  request,
string &  response,
Agent agent 
)
3365 {
3366  (response = json_list_outgoing().c_str());
3367  return 0;
3368 }
string json_list_outgoing()
Definition: agent_file3.cpp:3405
int32_t request_set_logstride ( string &  request,
string &  response,
Agent agent 
)
3332 {
3333  double new_logstride;
3334  sscanf(request.c_str(),"set_logstride %lf",&new_logstride);
3335  if(new_logstride > 0. )
3336  {
3337  logstride_sec = new_logstride;
3338  }
3339  return 0;
3340 }
double logstride_sec
Definition: agent_file3.cpp:182
int32_t request_get_logstride ( string &  ,
string &  response,
Agent  
)
3343 {
3344  response = "{" + to_json("logstride", logstride_sec) + "}";
3345  return 0;
3346 }
double logstride_sec
Definition: agent_file3.cpp:182
string to_json(string key, string value)
Definition: stringlib.cpp:334
int32_t outgoing_tx_add ( tx_progress tx_out)
2229 {
2230  if (agent->debug_level)
2231  {
2232  debug_fd_lock.lock();
2233  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: outgoing_tx_add: ", currentmjd(), dt.lap());
2234  fflush(agent->get_debug_fd());
2235  debug_fd_lock.unlock();
2236  }
2237 
2238  int32_t local_node = lookup_local_node_id(tx_out.node_name);
2239  if (local_node == 0)
2240  {
2241  if (agent->debug_level)
2242  {
2243  debug_fd_lock.lock();
2244  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_NODE\n");
2245  fflush(agent->get_debug_fd());
2246  debug_fd_lock.unlock();
2247  }
2248  return TRANSFER_ERROR_NODE;
2249  }
2250 
2251  // Only add if we have room
2252  if (txq[(local_node)].outgoing.size == TRANSFER_QUEUE_LIMIT)
2253  {
2254  if (agent->debug_level)
2255  {
2256  debug_fd_lock.lock();
2257  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_QUEUEFULL\n");
2258  fflush(agent->get_debug_fd());
2259  debug_fd_lock.unlock();
2260  }
2261  return TRANSFER_ERROR_QUEUEFULL;
2262  }
2263 
2264  // tx_out.state = STATE_QUEUE;
2265  if (tx_out.file_name.size())
2266  {
2267  tx_out.filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
2268  }
2269  else
2270  {
2271  if (agent->debug_level)
2272  {
2273  debug_fd_lock.lock();
2274  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_FILENAME\n");
2275  fflush(agent->get_debug_fd());
2276  debug_fd_lock.unlock();
2277  }
2278  tx_out.filepath = "";
2279  return TRANSFER_ERROR_FILENAME;
2280  }
2281 
2282  tx_out.temppath = data_base_path(tx_out.node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
2283 
2284  // Check for a duplicate file name of something already in queue
2285  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2286  {
2287  if (!txq[(local_node)].outgoing.progress[i].filepath.empty() && tx_out.filepath == txq[(local_node)].outgoing.progress[i].filepath)
2288  {
2289  // Remove the META file
2290  if (agent->debug_level)
2291  {
2292  debug_fd_lock.lock();
2293  fprintf(agent->get_debug_fd(), "%u %s %s %s TRANSFER_ERROR_DUPLICATE\n", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.filepath.c_str());
2294  fflush(agent->get_debug_fd());
2295  debug_fd_lock.unlock();
2296  }
2297  string filepath = tx_out.temppath + ".meta";
2298  remove(filepath.c_str());
2299  return TRANSFER_ERROR_DUPLICATE;
2300  }
2301  }
2302 
2303  tx_out.fp = nullptr;
2304  //get the file size
2305  tx_out.file_size = get_file_size(tx_out.filepath);
2306  tx_out.savetime = 0.;
2307 
2308  // save and queue metadata packet
2309  tx_out.havemeta = true;
2310  tx_out.sendmeta = true;
2311  tx_out.sentmeta = false;
2312  tx_out.senddata = false;
2313  tx_out.sentdata = false;
2314  tx_out.complete = false;
2315 
2316  if (agent->debug_level)
2317  {
2318  debug_fd_lock.lock();
2319  fprintf(agent->get_debug_fd(), "%u %s %s %s %lu ", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.filepath.c_str(), PROGRESS_QUEUE_SIZE);
2320  fflush(agent->get_debug_fd());
2321  debug_fd_lock.unlock();
2322  }
2323 
2324  // Good to go. Add it to queue.
2325  outgoing_tx_lock.lock();
2326  // txq[(local_node)].outgoing.progress[tx_out.tx_id] = tx_out;
2327  txq[(local_node)].outgoing.progress[tx_out.tx_id].tx_id = tx_out.tx_id;
2328  txq[(local_node)].outgoing.progress[tx_out.tx_id].havemeta = tx_out.havemeta;
2329  txq[(local_node)].outgoing.progress[tx_out.tx_id].sendmeta = tx_out.sendmeta;
2330  txq[(local_node)].outgoing.progress[tx_out.tx_id].sentmeta = tx_out.sentmeta;
2331  txq[(local_node)].outgoing.progress[tx_out.tx_id].senddata = tx_out.senddata;
2332  txq[(local_node)].outgoing.progress[tx_out.tx_id].sentdata = tx_out.sentdata;
2333  txq[(local_node)].outgoing.progress[tx_out.tx_id].complete = tx_out.complete;
2334  txq[(local_node)].outgoing.progress[tx_out.tx_id].node_name = tx_out.node_name;
2335  txq[(local_node)].outgoing.progress[tx_out.tx_id].agent_name = tx_out.agent_name;
2336  txq[(local_node)].outgoing.progress[tx_out.tx_id].file_name = tx_out.file_name;
2337  txq[(local_node)].outgoing.progress[tx_out.tx_id].filepath = tx_out.filepath;
2338  txq[(local_node)].outgoing.progress[tx_out.tx_id].temppath = tx_out.temppath;
2339  txq[(local_node)].outgoing.progress[tx_out.tx_id].savetime = tx_out.savetime;
2340  txq[(local_node)].outgoing.progress[tx_out.tx_id].file_size = tx_out.file_size;
2341  txq[(local_node)].outgoing.progress[tx_out.tx_id].total_bytes = tx_out.total_bytes;
2342  txq[(local_node)].outgoing.progress[tx_out.tx_id].file_info.clear();
2343  for (file_progress filep : tx_out.file_info)
2344  {
2345  txq[(local_node)].outgoing.progress[tx_out.tx_id].file_info.push_back(filep);
2346  }
2347  txq[(local_node)].outgoing.progress[tx_out.tx_id].fp = tx_out.fp;
2348  ++txq[(local_node)].outgoing.size;
2349  outgoing_tx_lock.unlock();
2350 
2351  if (agent->debug_level)
2352  {
2353  debug_fd_lock.lock();
2354  fprintf(agent->get_debug_fd(), " %u\n", txq[(local_node)].outgoing.size);
2355  fflush(agent->get_debug_fd());
2356  debug_fd_lock.unlock();
2357  }
2358 
2359  return outgoing_tx_recount(local_node);
2360 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
bool sentdata
Definition: transferlib.h:347
ElapsedTime dt
Definition: agent_file3.cpp:183
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
Definition: transferlib.h:332
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define TRANSFER_ERROR_QUEUEFULL
Definition: cosmos-errno.h:221
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
#define TRANSFER_ERROR_DUPLICATE
Definition: cosmos-errno.h:225
string agent_name
Definition: transferlib.h:350
static std::mutex outgoing_tx_lock
Definition: agent_file3.cpp:141
int32_t outgoing_tx_recount(uint8_t node)
Definition: agent_file3.cpp:2598
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
string file_name
Definition: transferlib.h:351
#define TRANSFER_ERROR_FILENAME
Definition: cosmos-errno.h:224
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
bool sendmeta
Definition: transferlib.h:344
int32_t outgoing_tx_add ( string  node_name,
string  agent_name,
string  file_name 
)
2363 {
2364  if (node_name.empty() || agent_name.empty() || file_name.empty())
2365  {
2366  if (agent->debug_level)
2367  {
2368  debug_fd_lock.lock();
2369  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: outgoing_tx_add: TRANSFER_ERROR_FILENAME\n", currentmjd(), dt.lap());
2370  fflush(agent->get_debug_fd());
2371  debug_fd_lock.unlock();
2372  }
2373  return TRANSFER_ERROR_FILENAME;
2374  }
2375 
2376  // BEGIN GATHERING THE METADATA
2377  tx_progress tx_out;
2378 
2379  uint8_t local_node = lookup_local_node_id(node_name);
2380  if (local_node == 0)
2381  {
2382  return TRANSFER_ERROR_NODE;
2383  }
2384 
2385  // Only add if we have room
2386  if (txq[(local_node)].outgoing.size == TRANSFER_QUEUE_LIMIT)
2387  {
2388  return TRANSFER_ERROR_QUEUEFULL;
2389  }
2390 
2391  // Locate next empty space
2392  //get the file size
2393  outgoing_tx_lock.lock();
2394  tx_out.tx_id = 0;
2395  PACKET_TX_ID_TYPE id = txq[(local_node)].outgoing.next_id;
2396  do
2397  {
2398  // 0 is special case
2399  if (id == 0)
2400  {
2401  ++id;
2402  }
2403 
2404  if (txq[(local_node)].outgoing.progress[id].tx_id == 0)
2405  {
2406  tx_out.tx_id = id;
2407  txq[(local_node)].outgoing.next_id = id + 1;
2408  break;
2409  }
2410  // If no empty found, increment, allowing to wrap if necessary
2411  } while (++id != txq[(local_node)].outgoing.next_id);
2412  outgoing_tx_lock.unlock();
2413 
2414  if (tx_out.tx_id > 0)
2415  {
2416  tx_out.havemeta = true;
2417  tx_out.sendmeta = true;
2418  tx_out.sentmeta = false;
2419  tx_out.senddata = false;
2420  tx_out.sentdata = false;
2421  tx_out.complete = false;
2422  tx_out.total_bytes = 0;
2423  tx_out.node_name = node_name;
2424  tx_out.agent_name = agent_name;
2425  tx_out.file_name = file_name;
2426  tx_out.temppath = data_base_path(tx_out.node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
2427  tx_out.filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
2428  tx_out.savetime = 0.;
2429 
2430  std::ifstream filename;
2431 
2432  //get the file size
2433  tx_out.file_size = get_file_size(tx_out.filepath);
2434 
2435  if(tx_out.file_size < 0)
2436  {
2437  if (agent->debug_level)
2438  {
2439  debug_fd_lock.lock();
2440  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: outgoing_tx_add: DATA_ERROR_SIZE_MISMATCH\n", currentmjd(), dt.lap());
2441  fflush(agent->get_debug_fd());
2442  debug_fd_lock.unlock();
2443  }
2444  return DATA_ERROR_SIZE_MISMATCH;
2445  }
2446 
2447  // see if file can be opened
2448  filename.open(tx_out.filepath, std::ios::in|std::ios::binary);
2449  if(!filename.is_open())
2450  {
2451  if (agent->debug_level)
2452  {
2453  debug_fd_lock.lock();
2454  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: outgoing_tx_add: %s\n", currentmjd(), dt.lap(), cosmos_error_string(-errno).c_str());
2455  fflush(agent->get_debug_fd());
2456  debug_fd_lock.unlock();
2457  }
2458  return -errno;
2459  }
2460  filename.close();
2461 
2462  file_progress tp;
2463  tp.chunk_start = 0;
2464  tp.chunk_end = tx_out.file_size - 1;
2465  tx_out.file_info.push_back(tp);
2466 
2467  write_meta(tx_out);
2468 
2469  int32_t iretn = outgoing_tx_add(tx_out);
2470  return iretn;
2471  }
2472  else
2473  {
2474  return TRANSFER_ERROR_MATCH;
2475  }
2476 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
bool sentdata
Definition: transferlib.h:347
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file3.cpp:1823
ElapsedTime dt
Definition: agent_file3.cpp:183
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
Definition: transferlib.h:332
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
bool havemeta
Definition: transferlib.h:342
string to_string(char *value)
Definition: stringlib.cpp:220
int iretn
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
string node_name
Definition: agent_001.cpp:46
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define TRANSFER_ERROR_QUEUEFULL
Definition: cosmos-errno.h:221
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file3.cpp:2228
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
static std::mutex outgoing_tx_lock
Definition: agent_file3.cpp:141
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
string agent_name
Definition: agent_001.cpp:47
string file_name
Definition: transferlib.h:351
#define TRANSFER_ERROR_FILENAME
Definition: cosmos-errno.h:224
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
bool sendmeta
Definition: transferlib.h:344
int32_t outgoing_tx_del ( uint8_t  node,
uint16_t  tx_id = 256 
)
2479 {
2480  if (local_node == 0 || local_node >= txq.size())
2481  {
2482  return TRANSFER_ERROR_INDEX;
2483  }
2484 
2485  if (txq[(local_node)].outgoing.progress[tx_id].tx_id == 0)
2486  {
2487  return TRANSFER_ERROR_MATCH;
2488  }
2489 
2490  if (tx_id >= PROGRESS_QUEUE_SIZE)
2491  {
2492  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2493  {
2494  outgoing_tx_del(local_node, i);
2495  }
2496  }
2497  else
2498  {
2499  tx_progress tx_out = txq[(local_node)].outgoing.progress[tx_id];
2500 
2501  // erase the transaction
2502 
2503  txq[(local_node)].outgoing.progress[tx_id].fp = nullptr;
2504  txq[(local_node)].outgoing.progress[tx_id].tx_id = 0;
2505  txq[(local_node)].outgoing.progress[tx_id].complete = false;
2506  txq[(local_node)].outgoing.progress[tx_id].filepath = "";
2507  txq[(local_node)].outgoing.progress[tx_id].havemeta = false;
2508  txq[(local_node)].outgoing.progress[tx_id].savetime = 0;
2509  txq[(local_node)].outgoing.progress[tx_id].temppath = "";
2510  txq[(local_node)].outgoing.progress[tx_id].file_name = "";
2511  txq[(local_node)].outgoing.progress[tx_id].file_size = 0;
2512  txq[(local_node)].outgoing.progress[tx_id].node_name = "";
2513  txq[(local_node)].outgoing.progress[tx_id].agent_name = "";
2514  txq[(local_node)].outgoing.progress[tx_id].total_bytes = 0;
2515  txq[(local_node)].outgoing.progress[tx_id].file_info.clear();
2516 
2517  if (txq[(local_node)].outgoing.size)
2518  {
2519  --txq[(local_node)].outgoing.size;
2520  }
2521 
2522  // Remove the file
2523  if(remove(tx_out.filepath.c_str()))
2524  {
2525  if (agent->debug_level)
2526  {
2527  debug_fd_lock.lock();
2528  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main/Outgoing: Del outgoing: %u %s %s %s - Unable to remove file\n", currentmjd(), dt.lap(), tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
2529  fflush(agent->get_debug_fd());
2530  debug_fd_lock.unlock();
2531  }
2532  }
2533 
2534  // Remove the META file
2535  string meta_filepath = tx_out.temppath + ".meta";
2536  remove(meta_filepath.c_str());
2537 
2538  if (agent->debug_level)
2539  {
2540  debug_fd_lock.lock();
2541  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main/Outgoing: Del outgoing: %u %s %s %s\n", currentmjd(), dt.lap(), tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
2542  fflush(agent->get_debug_fd());
2543  debug_fd_lock.unlock();
2544  }
2545  }
2546 
2547  return outgoing_tx_recount(local_node);
2548 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
ElapsedTime dt
Definition: agent_file3.cpp:183
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2478
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
string agent_name
Definition: transferlib.h:350
int32_t outgoing_tx_recount(uint8_t node)
Definition: agent_file3.cpp:2598
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
string file_name
Definition: transferlib.h:351
int32_t outgoing_tx_purge ( uint8_t  node,
uint16_t  tx_id = 256 
)
2551 {
2552  if (local_node == 0 || local_node >= txq.size())
2553  {
2554  return TRANSFER_ERROR_INDEX;
2555  }
2556 
2557  if (tx_id >= PROGRESS_QUEUE_SIZE)
2558  {
2559  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2560  {
2561  txq[(local_node)].outgoing.progress[i].fp = nullptr;
2562  txq[(local_node)].outgoing.progress[i].tx_id = 0;
2563  txq[(local_node)].outgoing.progress[i].complete = false;
2564  txq[(local_node)].outgoing.progress[i].filepath = "";
2565  txq[(local_node)].outgoing.progress[i].havemeta = false;
2566  txq[(local_node)].outgoing.progress[i].savetime = 0;
2567  txq[(local_node)].outgoing.progress[i].temppath = "";
2568  txq[(local_node)].outgoing.progress[i].file_name = "";
2569  txq[(local_node)].outgoing.progress[i].file_size = 0;
2570  txq[(local_node)].outgoing.progress[i].node_name = "";
2571  txq[(local_node)].outgoing.progress[i].agent_name = "";
2572  txq[(local_node)].outgoing.progress[i].total_bytes = 0;
2573  txq[(local_node)].outgoing.progress[i].file_info.clear();
2574  }
2575  }
2576  else {
2577  {
2578  txq[(local_node)].outgoing.progress[tx_id].fp = nullptr;
2579  txq[(local_node)].outgoing.progress[tx_id].tx_id = 0;
2580  txq[(local_node)].outgoing.progress[tx_id].complete = false;
2581  txq[(local_node)].outgoing.progress[tx_id].filepath = "";
2582  txq[(local_node)].outgoing.progress[tx_id].havemeta = false;
2583  txq[(local_node)].outgoing.progress[tx_id].savetime = 0;
2584  txq[(local_node)].outgoing.progress[tx_id].temppath = "";
2585  txq[(local_node)].outgoing.progress[tx_id].file_name = "";
2586  txq[(local_node)].outgoing.progress[tx_id].file_size = 0;
2587  txq[(local_node)].outgoing.progress[tx_id].node_name = "";
2588  txq[(local_node)].outgoing.progress[tx_id].agent_name = "";
2589  txq[(local_node)].outgoing.progress[tx_id].total_bytes = 0;
2590  txq[(local_node)].outgoing.progress[tx_id].file_info.clear();
2591  }
2592  }
2593  txq[(local_node)].outgoing.size = 0;
2594 
2595  return 0;
2596 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
int32_t outgoing_tx_recount ( uint8_t  node)
2599 {
2600  if (local_node == 0 || local_node >= txq.size())
2601  {
2602  return TRANSFER_ERROR_INDEX;
2603  }
2604 
2605  txq[(local_node)].outgoing.size = 0;
2606  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2607  {
2608  if (txq[(local_node)].outgoing.progress[i].tx_id)
2609  {
2610  ++txq[(local_node)].outgoing.size;
2611  }
2612  }
2613  return txq[(local_node)].outgoing.size;
2614 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
int32_t outgoing_tx_load ( uint8_t  node)
2617 {
2618  int32_t iretn=0;
2619 
2620  // Go through outgoing queue, removing files that no longer exist
2621  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2622  {
2623  if (txq[(local_node)].outgoing.progress[i].tx_id != 0 && !data_isfile(txq[(local_node)].outgoing.progress[i].filepath))
2624  {
2625  outgoing_tx_del(local_node, txq[(local_node)].outgoing.progress[i].tx_id);
2626  }
2627  }
2628 
2629  // Go through outgoing directories, adding files not already in queue
2630  if (txq[(local_node)].outgoing.size < TRANSFER_QUEUE_LIMIT)
2631  {
2632  vector<filestruc> file_names;
2633  for (filestruc file : data_list_files(txq[(local_node)].node_name, "outgoing", ""))
2634  {
2635  if (file.type == "directory")
2636  {
2637  iretn = data_list_files(txq[(local_node)].node_name, "outgoing", file.name, file_names);
2638  }
2639  }
2640 
2641  // Sort list by size, then go through list of files found, adding to queue.
2642  sort(file_names.begin(), file_names.end(), filestruc_compare_by_size);
2643  for(uint16_t i=0; i<file_names.size(); ++i)
2644  {
2645  filestruc file = file_names[i];
2646  if (txq[(local_node)].outgoing.size >= TRANSFER_QUEUE_LIMIT)
2647  {
2648  break;
2649  }
2650 
2651  //Ignore sub-directories
2652  if (file.type == "directory")
2653  {
2654  continue;
2655  }
2656 
2657  // Ignore zero length files (may still be being formed)
2658  if (file.size == 0)
2659  {
2660  continue;
2661  }
2662 
2663  bool addtoqueue = true;
2664  outgoing_tx_lock.lock();
2665  for(tx_progress progress : txq[(local_node)].outgoing.progress)
2666  {
2667  if (progress.tx_id && file.path == progress.filepath)
2668  {
2669  addtoqueue = false;
2670  break;
2671  }
2672  }
2673 
2674  outgoing_tx_lock.unlock();
2675 
2676  if (addtoqueue)
2677  {
2678  iretn = outgoing_tx_add(file.node, file.agent, file.name);
2679  if (agent->debug_level)
2680  {
2681  debug_fd_lock.lock();
2682  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main/Load: outgoing_tx_add: %s [%d]\n", currentmjd(), dt.lap(), file.path.c_str(), iretn);
2683  fflush(agent->get_debug_fd());
2684  debug_fd_lock.unlock();
2685  }
2686  }
2687  }
2688  }
2689 
2690  return iretn;
2691 }
string name
Definition: datalib.h:117
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
off_t size
Definition: datalib.h:120
static Agent * agent
Definition: agent_file3.cpp:94
ElapsedTime dt
Definition: agent_file3.cpp:183
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string node
Definition: datalib.h:115
int iretn
Definition: rw_test.cpp:37
vector< filestruc > data_list_files(string directory)
Get list of files in a directory, directly.
Definition: datalib.cpp:461
string node_name
Definition: agent_001.cpp:46
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
Definition: datalib.h:113
bool data_isfile(string path, off_t size)
Definition: datalib.cpp:1895
string path
Definition: datalib.h:119
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2478
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file3.cpp:2228
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
static std::mutex outgoing_tx_lock
Definition: agent_file3.cpp:141
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
string agent
Definition: datalib.h:116
bool filestruc_compare_by_size(const filestruc &a, const filestruc &b)
Definition: agent_file3.cpp:3001
string type
Definition: datalib.h:118
int32_t incoming_tx_add ( tx_progress tx_in)
2694 {
2695  uint8_t local_node = lookup_local_node_id(tx_in.node_name);
2696  if (local_node == 0)
2697  {
2698  if (agent->debug_level)
2699  {
2700  debug_fd_lock.lock();
2701  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: incoming_tx_add: TRANSFER_ERROR_NODE\n", currentmjd(), dt.lap());
2702  fflush(agent->get_debug_fd());
2703  debug_fd_lock.unlock();
2704  }
2705  return TRANSFER_ERROR_NODE;
2706  }
2707 
2708  // Check for an actual file name
2709  if (tx_in.file_name.size())
2710  {
2711  tx_in.filepath = data_base_path(tx_in.node_name, "incoming", tx_in.agent_name, tx_in.file_name);
2712  }
2713  else
2714  {
2715  tx_in.filepath = "";
2716  }
2717 
2718  string tx_name = "in_"+std::to_string(tx_in.tx_id);
2719  tx_in.temppath = data_base_path(tx_in.node_name, "temp", "file", tx_name);
2720 
2721  // Check for a duplicate file name of something already in queue
2722  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2723  {
2724  if (!txq[(local_node)].incoming.progress[i].filepath.empty() && tx_in.filepath == txq[(local_node)].incoming.progress[i].filepath)
2725  {
2726  if (agent->debug_level)
2727  {
2728  debug_fd_lock.lock();
2729  fprintf(agent->get_debug_fd(), "%u %s %s %s TRANSFER_ERROR_DUPLICATE\n", tx_in.tx_id, tx_in.node_name.c_str(), tx_in.agent_name.c_str(), tx_in.filepath.c_str());
2730  fflush(agent->get_debug_fd());
2731  debug_fd_lock.unlock();
2732  }
2733  // Remove the META file
2734  string filepath = tx_in.temppath + ".meta";
2735  remove(filepath.c_str());
2736  return TRANSFER_ERROR_DUPLICATE;
2737  }
2738  }
2739 
2740  tx_in.savetime = 0.;
2741  tx_in.fp = nullptr;
2742  // tx_in.state = STATE_REQMETA;
2743  // if (tx_in.havemeta)
2744  // {
2745  // tx_in.state = STATE_REQDATA;
2746  // }
2747  // if (tx_in.complete)
2748  // {
2749  // tx_in.state = STATE_COMPLETE;
2750  // }
2751 
2752  // Put it in list
2753  // txq[(local_node)].incoming.progress[tx_in.tx_id] = tx_in;
2754  txq[(local_node)].incoming.progress[tx_in.tx_id].tx_id = tx_in.tx_id;
2755  txq[(local_node)].incoming.progress[tx_in.tx_id].havemeta = tx_in.havemeta;
2756  txq[(local_node)].incoming.progress[tx_in.tx_id].sendmeta = tx_in.sendmeta;
2757  txq[(local_node)].incoming.progress[tx_in.tx_id].sentmeta = tx_in.sentmeta;
2758  txq[(local_node)].incoming.progress[tx_in.tx_id].senddata = tx_in.senddata;
2759  txq[(local_node)].incoming.progress[tx_in.tx_id].sentdata = tx_in.sentdata;
2760  txq[(local_node)].incoming.progress[tx_in.tx_id].complete = tx_in.complete;
2761  txq[(local_node)].incoming.progress[tx_in.tx_id].node_name = tx_in.node_name;
2762  txq[(local_node)].incoming.progress[tx_in.tx_id].agent_name = tx_in.agent_name;
2763  txq[(local_node)].incoming.progress[tx_in.tx_id].file_name = tx_in.file_name;
2764  txq[(local_node)].incoming.progress[tx_in.tx_id].filepath = tx_in.filepath;
2765  txq[(local_node)].incoming.progress[tx_in.tx_id].temppath = tx_in.temppath;
2766  txq[(local_node)].incoming.progress[tx_in.tx_id].savetime = tx_in.savetime;
2767  txq[(local_node)].incoming.progress[tx_in.tx_id].file_size = tx_in.file_size;
2768  txq[(local_node)].incoming.progress[tx_in.tx_id].total_bytes = tx_in.total_bytes;
2769  txq[(local_node)].incoming.progress[tx_in.tx_id].file_info.clear();
2770  for (file_progress filep : tx_in.file_info)
2771  {
2772  txq[(local_node)].incoming.progress[tx_in.tx_id].file_info.push_back(filep);
2773  }
2774  txq[(local_node)].incoming.progress[tx_in.tx_id].fp = tx_in.fp;
2775  ++txq[(local_node)].incoming.size;
2776 
2777  if (agent->debug_level)
2778  {
2779  debug_fd_lock.lock();
2780  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main/Incoming: Add incoming: %u %s %s %s %lu\n", currentmjd(), dt.lap(), tx_in.tx_id, tx_in.node_name.c_str(), tx_in.agent_name.c_str(), tx_in.filepath.c_str(), PROGRESS_QUEUE_SIZE);
2781  fflush(agent->get_debug_fd());
2782  debug_fd_lock.unlock();
2783  }
2784 
2785  return incoming_tx_recount(local_node);
2786 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
bool sentdata
Definition: transferlib.h:347
ElapsedTime dt
Definition: agent_file3.cpp:183
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int32_t incoming_tx_recount(uint8_t node)
Definition: agent_file3.cpp:2983
int i
Definition: rw_test.cpp:37
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
Definition: transferlib.h:332
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_DUPLICATE
Definition: cosmos-errno.h:225
string agent_name
Definition: transferlib.h:350
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
string file_name
Definition: transferlib.h:351
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
bool sendmeta
Definition: transferlib.h:344
int32_t incoming_tx_add ( string  node_name,
PACKET_TX_ID_TYPE  tx_id 
)
2789 {
2790  tx_progress tx_in;
2791 
2792  tx_in.tx_id = tx_id;
2793  tx_in.sendmeta = false;
2794  tx_in.havemeta = false;
2795  tx_in.sentmeta = false;
2796  tx_in.senddata = false;
2797  tx_in.sentdata = false;
2798  tx_in.complete = false;
2799  tx_in.node_name = node_name;
2800  tx_in.agent_name = "";
2801  tx_in.file_name = "";
2802  tx_in.savetime = 0.;
2803  tx_in.file_size = 0;
2804  tx_in.total_bytes = 0;
2805  tx_in.file_info.clear();
2806 
2807  int32_t iretn = incoming_tx_add(tx_in);
2808 
2809  return iretn;
2810 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
bool sentdata
Definition: transferlib.h:347
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file3.cpp:2693
Definition: transferlib.h:338
bool havemeta
Definition: transferlib.h:342
int iretn
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
double savetime
Definition: transferlib.h:354
string node_name
Definition: agent_001.cpp:46
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
string file_name
Definition: transferlib.h:351
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
bool sendmeta
Definition: transferlib.h:344
int32_t incoming_tx_update ( packet_struct_metashort  meta)
2813 {
2814  uint8_t local_node = lookup_local_node_id(meta.node_id);
2815  if (local_node == 0)
2816  {
2817  return TRANSFER_ERROR_NODE;
2818  }
2819 
2820  if (meta.tx_id)
2821  {
2822  if (txq[(local_node)].incoming.progress[meta.tx_id].tx_id != meta.tx_id)
2823  {
2824  txq[(local_node)].incoming.progress[meta.tx_id].tx_id = meta.tx_id;
2825  txq[(local_node)].incoming.progress[meta.tx_id].havemeta = false;
2826  }
2827 
2828  if (!txq[(local_node)].incoming.progress[meta.tx_id].havemeta)
2829  {
2830  // Core META information
2831  txq[(local_node)].incoming.progress[meta.tx_id].node_name = txq[(local_node)].node_name;
2832  txq[(local_node)].incoming.progress[meta.tx_id].agent_name = meta.agent_name;
2833  txq[(local_node)].incoming.progress[meta.tx_id].file_name = meta.file_name;
2834  txq[(local_node)].incoming.progress[meta.tx_id].file_size = meta.file_size;
2835  txq[(local_node)].incoming.progress[meta.tx_id].filepath = data_base_path(txq[(local_node)].incoming.progress[meta.tx_id].node_name, "incoming", txq[(local_node)].incoming.progress[meta.tx_id].agent_name, txq[(local_node)].incoming.progress[meta.tx_id].file_name);
2836  string tx_name = "in_"+std::to_string(txq[(local_node)].incoming.progress[meta.tx_id].tx_id);
2837  txq[(local_node)].incoming.progress[meta.tx_id].temppath = data_base_path(txq[(local_node)].incoming.progress[meta.tx_id].node_name, "temp", "file", tx_name);
2838 
2839  // Derivative META information
2840  // txq[(local_node)].incoming.progress[meta.tx_id].state = STATE_REQDATA;
2841  txq[(local_node)].incoming.progress[meta.tx_id].savetime = 0.;
2842  txq[(local_node)].incoming.progress[meta.tx_id].havemeta = true;
2843  txq[(local_node)].incoming.progress[meta.tx_id].sendmeta = false;
2844  txq[(local_node)].incoming.progress[meta.tx_id].sentmeta = false;
2845  txq[(local_node)].incoming.progress[meta.tx_id].senddata = false;
2846  txq[(local_node)].incoming.progress[meta.tx_id].sentdata = false;
2847  txq[(local_node)].incoming.progress[meta.tx_id].complete = false;
2848  txq[(local_node)].incoming.progress[meta.tx_id].total_bytes = 0;
2849  txq[(local_node)].incoming.progress[meta.tx_id].fp = nullptr;
2850  txq[(local_node)].incoming.progress[meta.tx_id].file_info.clear();
2851 
2852  // Save it to disk
2853  write_meta(txq[(local_node)].incoming.progress[meta.tx_id]);
2854  }
2855 
2856  if (agent->debug_level)
2857  {
2858  debug_fd_lock.lock();
2859  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming: Update incoming: %u %s %s %s\n", currentmjd(), dt.lap(), txq[(local_node)].incoming.progress[meta.tx_id].tx_id, txq[(local_node)].incoming.progress[meta.tx_id].node_name.c_str(), txq[(local_node)].incoming.progress[meta.tx_id].agent_name.c_str(), txq[(local_node)].incoming.progress[meta.tx_id].file_name.c_str());
2860  fflush(agent->get_debug_fd());
2861  debug_fd_lock.unlock();
2862  }
2863 
2864  return meta.tx_id;
2865  }
2866  else
2867  {
2868  return TRANSFER_ERROR_INDEX;
2869  }
2870 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file3.cpp:1823
ElapsedTime dt
Definition: agent_file3.cpp:183
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:248
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:247
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
char file_name[128]
Definition: transferlib.h:250
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:249
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:251
int32_t incoming_tx_del ( uint8_t  node,
uint16_t  tx_id = 256 
)
2873 {
2874  local_node = check_local_node_id(local_node);
2875  if (local_node == 0)
2876  {
2877  return TRANSFER_ERROR_NODE;
2878  }
2879 
2880  if (tx_id >= PROGRESS_QUEUE_SIZE)
2881  {
2882  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2883  {
2884  incoming_tx_del(local_node, i);
2885  }
2886  }
2887  else
2888  {
2889  if (txq[(local_node)].incoming.progress[tx_id].tx_id == 0)
2890  {
2891  return TRANSFER_ERROR_MATCH;
2892  }
2893 
2894  tx_progress tx_in = txq[(local_node)].incoming.progress[tx_id];
2895 
2896  txq[(local_node)].incoming.progress[tx_id].tx_id = 0;
2897  // txq[(local_node)].incoming.progress[tx_id].state = STATE_NONE;
2898  txq[(local_node)].incoming.progress[tx_id].havemeta = false;
2899  if (txq[(local_node)].incoming.size)
2900  {
2901  --txq[(local_node)].incoming.size;
2902  }
2903 
2904  // Close the DATA file
2905  if (tx_in.fp != nullptr)
2906  {
2907  fclose(tx_in.fp);
2908  tx_in.fp = nullptr;
2909  }
2910 
2911  string filepath;
2912  //Remove the DATA file
2913  filepath = tx_in.temppath + ".file";
2914  remove(filepath.c_str());
2915 
2916  // Remove the META file
2917  filepath = tx_in.temppath + ".meta";
2918  remove(filepath.c_str());
2919 
2920  if (agent->debug_level)
2921  {
2922  debug_fd_lock.lock();
2923  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming: Del incoming: %u %s\n", currentmjd(), dt.lap(), tx_in.tx_id, tx_in.node_name.c_str());
2924  fflush(agent->get_debug_fd());
2925  debug_fd_lock.unlock();
2926  }
2927  }
2928 
2929  return incoming_tx_recount(local_node);
2930 
2931 }
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2872
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
ElapsedTime dt
Definition: agent_file3.cpp:183
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int32_t incoming_tx_recount(uint8_t node)
Definition: agent_file3.cpp:2983
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
FILE * fp
Definition: transferlib.h:359
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
uint8_t check_local_node_id(PACKET_NODE_ID_TYPE local_id)
Definition: agent_file3.cpp:3152
int32_t incoming_tx_purge ( uint8_t  node,
uint16_t  tx_id = 256 
)
2934 {
2935  if (local_node == 0 || local_node >= txq.size())
2936  {
2937  return TRANSFER_ERROR_INDEX;
2938  }
2939 
2940  if (tx_id >= PROGRESS_QUEUE_SIZE)
2941  {
2942  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2943  {
2944  txq[(local_node)].incoming.progress[i].fp = nullptr;
2945  txq[(local_node)].incoming.progress[i].tx_id = 0;
2946  // txq[(local_node)].incoming.progress[i].state = STATE_NONE;
2947  txq[(local_node)].incoming.progress[i].complete = false;
2948  txq[(local_node)].incoming.progress[i].filepath = "";
2949  txq[(local_node)].incoming.progress[i].havemeta = false;
2950  txq[(local_node)].incoming.progress[i].savetime = 0;
2951  txq[(local_node)].incoming.progress[i].temppath = "";
2952  txq[(local_node)].incoming.progress[i].file_name = "";
2953  txq[(local_node)].incoming.progress[i].file_size = 0;
2954  txq[(local_node)].incoming.progress[i].node_name = "";
2955  txq[(local_node)].incoming.progress[i].agent_name = "";
2956  txq[(local_node)].incoming.progress[i].total_bytes = 0;
2957  txq[(local_node)].incoming.progress[i].file_info.clear();
2958  }
2959  }
2960  else {
2961  {
2962  txq[(local_node)].incoming.progress[tx_id].fp = nullptr;
2963  txq[(local_node)].incoming.progress[tx_id].tx_id = 0;
2964  // txq[(local_node)].incoming.progress[tx_id].state = STATE_NONE;
2965  txq[(local_node)].incoming.progress[tx_id].complete = false;
2966  txq[(local_node)].incoming.progress[tx_id].filepath = "";
2967  txq[(local_node)].incoming.progress[tx_id].havemeta = false;
2968  txq[(local_node)].incoming.progress[tx_id].savetime = 0;
2969  txq[(local_node)].incoming.progress[tx_id].temppath = "";
2970  txq[(local_node)].incoming.progress[tx_id].file_name = "";
2971  txq[(local_node)].incoming.progress[tx_id].file_size = 0;
2972  txq[(local_node)].incoming.progress[tx_id].node_name = "";
2973  txq[(local_node)].incoming.progress[tx_id].agent_name = "";
2974  txq[(local_node)].incoming.progress[tx_id].total_bytes = 0;
2975  txq[(local_node)].incoming.progress[tx_id].file_info.clear();
2976  }
2977  }
2978  txq[(local_node)].incoming.size = 0;
2979 
2980  return 0;
2981 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
int32_t incoming_tx_recount ( uint8_t  node)
2984 {
2985  if (local_node == 0 || local_node >= txq.size())
2986  {
2987  return TRANSFER_ERROR_INDEX;
2988  }
2989 
2990  txq[(local_node)].incoming.size = 0;
2991  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2992  {
2993  if (txq[(local_node)].incoming.progress[i].tx_id)
2994  {
2995  ++txq[(local_node)].incoming.size;
2996  }
2997  }
2998  return txq[(local_node)].incoming.size;
2999 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
vector<file_progress> find_chunks_missing ( tx_progress tx)
1991 {
1992  vector<file_progress> missing;
1993  file_progress tp;
1994 
1995  if (tx.file_info.size() == 0)
1996  {
1997  tp.chunk_start = 0;
1998  tp.chunk_end = tx.file_size - 1;
1999  missing.push_back(tp);
2000  }
2001  else
2002  {
2004  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
2005 
2006  // Check missing before first chunk
2007  if (tx.file_info[0].chunk_start)
2008  {
2009  tp.chunk_start = 0;
2010  tp.chunk_end = tx.file_info[0].chunk_start - 1;
2011  missing.push_back(tp);
2012  }
2013 
2014  // Check missing between chunks
2015  for (uint32_t i=1; i<tx.file_info.size(); ++i)
2016  {
2017  if (tx.file_info[i-1].chunk_end+1 != tx.file_info[i].chunk_start)
2018  {
2019  tp.chunk_start = tx.file_info[i-1].chunk_end + 1;
2020  tp.chunk_end = tx.file_info[i].chunk_start - 1;
2021  missing.push_back(tp);
2022  }
2023  }
2024 
2025  // Check missing after last chunk
2026  if (tx.file_info[tx.file_info.size()-1].chunk_end + 1 != tx.file_size)
2027  {
2028  tp.chunk_start = tx.file_info[tx.file_info.size()-1].chunk_end + 1;
2029  tp.chunk_end = tx.file_size - 1;
2030  missing.push_back(tp);
2031  }
2032  }
2033 
2034  // calculate bytes so far
2035  tx.total_bytes = 0;
2036  for (file_progress prog : tx.file_info)
2037  {
2038  tx.total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
2039  }
2040 
2041  return (missing);
2042 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file3.cpp:1946
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file3.cpp:1951
vector< file_progress > find_chunks_togo ( tx_progress tx)
2045 {
2046  vector<file_progress> togo;
2047  file_progress tp;
2048 
2049  if (tx.file_info.size())
2050  {
2052  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
2053 
2054  // Set first chunk
2055  tp.chunk_start = tx.file_info[0].chunk_start;
2056  tp.chunk_end = tx.file_info[0].chunk_end;
2057 
2058  // Add middle chunks to go
2059  for (uint32_t i=1; i<tx.file_info.size(); ++i)
2060  {
2061  if (tx.file_info[i-1].chunk_end+1 == tx.file_info[i].chunk_start)
2062  {
2063  tp.chunk_end = tx.file_info[i].chunk_end;
2064  }
2065  else
2066  {
2067  togo.push_back(tp);
2068  tp.chunk_start = tx.file_info[i].chunk_start;
2069  tp.chunk_end = tx.file_info[i].chunk_end;
2070  }
2071  }
2072 
2073  // Add last chunk
2074  togo.push_back(tp);
2075  }
2076 
2077  // calculate bytes left
2078  tx.total_bytes = 0;
2079  for (file_progress prog : togo)
2080  {
2081  tx.total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
2082  }
2083 
2084  return (togo);
2085 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file3.cpp:1946
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file3.cpp:1951
PACKET_FILE_SIZE_TYPE merge_chunks_overlap ( tx_progress tx)
1952 {
1953  switch (tx.file_info.size())
1954  {
1955  case 0:
1956  {
1957  tx.total_bytes = 0;
1958  break;
1959  }
1960  case 1:
1961  {
1962  tx.total_bytes = (tx.file_info[0].chunk_end - tx.file_info[0].chunk_start) + 1;
1963  break;
1964  }
1965  default:
1966  {
1967  tx.total_bytes = 0;
1968  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
1969  for (uint32_t i=0; i<tx.file_info.size(); ++i)
1970  {
1971  for (uint32_t j=i+1; j<tx.file_info.size(); ++j)
1972  {
1973  while (j < tx.file_info.size() && tx.file_info[j].chunk_start <= tx.file_info[i].chunk_end+1)
1974  {
1975  if (tx.file_info[j].chunk_end > tx.file_info[i].chunk_end)
1976  {
1977  tx.file_info[i].chunk_end = tx.file_info[j].chunk_end;
1978  }
1979  tx.file_info.erase(tx.file_info.begin()+j);
1980  }
1981  }
1982  tx.total_bytes += (tx.file_info[i].chunk_end - tx.file_info[i].chunk_start) + 1;
1983  }
1984  break;
1985  }
1986  }
1987  return tx.total_bytes;
1988 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
int i
Definition: rw_test.cpp:37
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file3.cpp:1946
double queuecheck ( PACKET_NODE_ID_TYPE  node_id)
1430 {
1431  int32_t use_channel;
1432 
1433  use_channel = -1;
1434  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1435  {
1436  if (txq[node_id].node_name == out_comm_channel[i].node)
1437  {
1438  use_channel = i;
1439  break;
1440  }
1441  }
1442 
1443  if (use_channel == -1)
1444  {
1445  return -1.;
1446  }
1447 
1448  return (1. * transmit_queue.size() * out_comm_channel[use_channel].packet_size) / out_comm_channel[use_channel].throughput;
1449 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
static string node
Definition: agent_monitor.cpp:126
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file3.cpp:130
int32_t queuesendto ( PACKET_NODE_ID_TYPE  node_id,
string  type,
vector< PACKET_BYTE packet 
)
1452 {
1453  transmit_queue_entry tentry;
1454  std::vector<channelstruc>::size_type use_channel;
1455 
1456  use_channel = -1;
1457  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1458  {
1459  if (txq[node_id].node_name == out_comm_channel[i].node)
1460  {
1461  use_channel = i;
1462  break;
1463  }
1464  }
1465 
1466  if (use_channel > out_comm_channel.size())
1467  {
1468  return -1.;
1469  }
1470 
1471  tentry.type = type;
1472  tentry.channel = use_channel;
1473  tentry.packet = packet;
1474  tentry.time_step = (28 + packet.size()) / (86400. * out_comm_channel[use_channel].throughput);
1475  if (transmit_queue.empty())
1476  {
1477  tentry.nmjd = out_comm_channel[use_channel].nmjd + tentry.time_step;
1478  }
1479  else
1480  {
1481  tentry.nmjd = transmit_queue.back().nmjd + tentry.time_step;
1482  }
1483  transmit_queue.push(tentry);
1484  // transmit_queue_check.notify_one();
1485  // Sleep just a bit to give other threads a chance
1486  COSMOS_SLEEP(.001);
1487  return use_channel;
1488 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
double time_step
Definition: agent_file3.cpp:126
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
string node_name
Definition: agent_001.cpp:46
double nmjd
Definition: agent_file3.cpp:125
static uint16_t use_channel
Definition: agent_file.cpp:97
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
Definition: agent_file.cpp:110
static string node
Definition: agent_monitor.cpp:126
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file3.cpp:130
int32_t mysendto ( string  type,
int32_t  use_channel,
vector< PACKET_BYTE > &  buf 
)
1491 {
1492  int32_t iretn;
1493  double cmjd;
1494 
1495  if ((cmjd = currentmjd(0.)) < out_comm_channel[use_channel].nmjd)
1496  {
1497  COSMOS_SLEEP((86400. * (out_comm_channel[use_channel].nmjd - cmjd)));
1498  }
1499 
1500  iretn = sendto(out_comm_channel[use_channel].chansock.cudp, reinterpret_cast<const char*>(&buf[0]), buf.size(), 0, reinterpret_cast<sockaddr*>(&out_comm_channel[use_channel].chansock.caddr), sizeof(struct sockaddr_in));
1501 
1502  if (iretn >= 0)
1503  {
1504  ++packet_out_count;
1506  out_comm_channel[use_channel].nmjd = out_comm_channel[use_channel].lomjd + ((28+iretn) / (float)out_comm_channel[use_channel].throughput)/86400.;
1507  if (agent->debug_level)
1508  {
1509  debug_packet(buf, PACKET_OUT, type, use_channel);
1510  }
1511  }
1512  else
1513  {
1514  iretn = -errno;
1515  ++send_error_count;
1516  }
1517 
1518  return iretn;
1519 }
static double cmjd
Definition: agent_monitor.cpp:121
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
static Agent * agent
Definition: agent_file3.cpp:94
static uint32_t packet_out_count
Definition: agent_file3.cpp:149
int iretn
Definition: rw_test.cpp:37
#define PACKET_OUT
Definition: agent_file3.cpp:71
static uint16_t use_channel
Definition: agent_file.cpp:97
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
void debug_packet(vector< PACKET_BYTE > buf, uint8_t direction, string type, int32_t use_channel)
Definition: agent_file3.cpp:1651
static uint32_t send_error_count
Definition: agent_file3.cpp:153
char buf[128]
Definition: rw_test.cpp:40
int32_t myrecvfrom ( string  type,
socket_channel channel,
vector< PACKET_BYTE > &  buf,
uint32_t  length,
double  dtimeout = 1. 
)
1522 {
1523  int32_t nbytes = 0;
1524 
1525  buf.resize(length);
1526  ElapsedTime et;
1527  do
1528  {
1529  fd_set set;
1530  FD_ZERO(&set);
1531  int fdmax = -1;
1532  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1533  {
1534  FD_SET(out_comm_channel[i].chansock.cudp, &set);
1535  if (out_comm_channel[i].chansock.cudp > fdmax)
1536  {
1537  fdmax = out_comm_channel[i].chansock.cudp;
1538  }
1539  }
1540  double rtimeout = dtimeout - et.split();
1541  if (rtimeout >= 0.)
1542  {
1543 #if !defined(COSMOS_WIN_OS)
1544  timeval timeout;
1545  timeout.tv_sec = static_cast<int32_t>(rtimeout);
1546  timeout.tv_usec = static_cast<int32_t>(1000000. * (rtimeout - timeout.tv_sec));
1547  int rv = select(fdmax+1, &set, nullptr, nullptr, &timeout);
1548  if (rv == -1)
1549  {
1550  nbytes = -errno;
1551  }
1552  else if (rv == 0)
1553  {
1554  nbytes = GENERAL_ERROR_TIMEOUT;
1556  }
1557  else
1558  {
1559  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1560  {
1561  if (FD_ISSET(out_comm_channel[i].chansock.cudp, &set))
1562  {
1563  channel = out_comm_channel[i].chansock;
1564  nbytes = recvfrom(channel.cudp, reinterpret_cast<char *>(&buf[0]), length, 0, reinterpret_cast<sockaddr*>(&channel.caddr), reinterpret_cast<socklen_t *>(&channel.addrlen));
1565  if (nbytes > 0)
1566  {
1567  uint16_t crccalc = calc_crc16ccitt(&buf[3], nbytes-3);
1568  uint16_t crc;
1569  memmove(&crc, &buf[0]+PACKET_HEADER_OFFSET_CRC, sizeof(PACKET_CRC));
1570  if (crc != crccalc)
1571  {
1572  nbytes = GENERAL_ERROR_CRC;
1573  ++crc_error_count;
1574  }
1575  else
1576  {
1577  ++packet_in_count;
1578  buf.resize(nbytes);
1579  if (agent->debug_level)
1580  {
1581  debug_packet(buf, PACKET_IN, type, i);
1582  }
1583  }
1584  return nbytes;
1585  }
1586  else
1587  {
1588  if (nbytes < 0)
1589  {
1590  nbytes = -errno;
1591  ++recv_error_count;
1592  }
1593  else
1594  {
1595  nbytes = GENERAL_ERROR_INPUT;
1596  ++recv_error_count;
1597  }
1598  }
1599  }
1600  }
1601  }
1602 #else
1603  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1604  {
1605  channel = out_comm_channel[i].chansock;
1606  nbytes = recvfrom(channel.cudp, reinterpret_cast<char *>(&buf[0]), length, 0, reinterpret_cast<sockaddr*>(&channel.caddr), reinterpret_cast<socklen_t *>(&channel.addrlen));
1607  if (nbytes > 0)
1608  {
1609  uint16_t crccalc = calc_crc16ccitt(&buf[3], nbytes-3);
1610  uint16_t crc;
1611  memmove(&crc, &buf[0]+PACKET_HEADER_OFFSET_CRC, sizeof(PACKET_CRC));
1612  if (crc != crccalc)
1613  {
1614  nbytes = GENERAL_ERROR_CRC;
1615  ++crc_error_count;
1616  }
1617  else
1618  {
1619  ++packet_in_count;
1620  buf.resize(nbytes);
1621  debug_packet(buf, PACKET_IN, type, i);
1622  }
1623  return nbytes;
1624  }
1625  else
1626  {
1627  if (nbytes < 0)
1628  {
1629  if (errno == EAGAIN || errno == EWOULDBLOCK)
1630  {
1631  COSMOS_SLEEP(.1);
1632  break;
1633  }
1634  nbytes = -errno;
1635  ++recv_error_count;
1636  }
1637  else
1638  {
1639  nbytes = GENERAL_ERROR_INPUT;
1640  ++recv_error_count;
1641  }
1642  }
1643  }
1644 #endif // Not windows
1645  }
1646  } while (et.split() < dtimeout);
1647 
1648  return nbytes;
1649 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
static Agent * agent
Definition: agent_file3.cpp:94
int i
Definition: rw_test.cpp:37
#define GENERAL_ERROR_TIMEOUT
Definition: cosmos-errno.h:292
png_uint_32 crc
Definition: png.c:2173
uint16_t PACKET_CRC
Definition: transferlib.h:142
ElapsedTime et
Definition: agent_cpu_device_test.cpp:51
static uint32_t packet_in_count
Definition: agent_file3.cpp:148
static uint32_t recv_error_count
Definition: agent_file3.cpp:154
int32_t cudp
Definition: socketlib.h:120
static uint32_t crc_error_count
Definition: agent_file3.cpp:150
#define PACKET_HEADER_OFFSET_CRC
Definition: transferlib.h:150
struct sockaddr_in caddr
Definition: socketlib.h:122
#define GENERAL_ERROR_CRC
Definition: cosmos-errno.h:284
int addrlen
Definition: socketlib.h:128
static uint32_t timeout_error_count
Definition: agent_file3.cpp:151
Definition: elapsedtime.h:62
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
void debug_packet(vector< PACKET_BYTE > buf, uint8_t direction, string type, int32_t use_channel)
Definition: agent_file3.cpp:1651
png_uint_32 length
Definition: png.c:2173
char buf[128]
Definition: rw_test.cpp:40
#define GENERAL_ERROR_INPUT
Definition: cosmos-errno.h:293
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
uint16_t calc_crc16ccitt(uint8_t *buf, int size, bool lsb)
Calculate CRC-16-CCITT.
Definition: mathlib.cpp:2206
#define PACKET_IN
Definition: agent_file3.cpp:70
void debug_packet ( vector< PACKET_BYTE buf,
uint8_t  direction,
string  type,
int32_t  use_channel 
)
1652 {
1653  static ElapsedTime dt;
1654 
1655  if (agent->debug_level)
1656  {
1657  debug_fd_lock.lock();
1658 
1659  string node_name = "";
1660  uint8_t local_node = 0;
1661  uint8_t remote_node = 0;
1662  if (direction == PACKET_IN)
1663  {
1664  remote_node = buf[PACKET_HEADER_OFFSET_NODE_ID];
1665  switch (buf[0] & 0x0f)
1666  {
1667  case PACKET_HEARTBEAT:
1668  case PACKET_REQQUEUE:
1669  case PACKET_QUEUE:
1670  case PACKET_REQMETA:
1671  node_name = reinterpret_cast<char *>(&buf[PACKET_HEADER_OFFSET_NODE_NAME]);
1672  local_node = lookup_local_node_id(node_name);
1673  if (local_node == 0)
1674  {
1675  local_node = add_node_name(node_name);
1676  }
1677  txq[local_node].remote_id = remote_node;
1678  break;
1679  case PACKET_METADATA:
1680  case PACKET_REQDATA:
1681  case PACKET_DATA:
1682  case PACKET_COMPLETE:
1683  case PACKET_CANCEL:
1684  local_node = lookup_local_node_id(remote_node);
1685  if (local_node > 0)
1686  {
1687  node_name = txq[local_node].node_name;
1688  }
1689  break;
1690  }
1691 
1692  if (out_comm_channel[use_channel].node.empty())
1693  {
1694  if (!node_name.empty())
1695  {
1696  fprintf(agent->get_debug_fd(), "%16.10f %.4f RECV L %u R %u %s %s [%s] In: %u Out: %u Size: %u ", currentmjd(), dt.lap(), local_node, remote_node, node_name.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1697  }
1698  else
1699  {
1700  fprintf(agent->get_debug_fd(), "%16.10f %.4f RECV L %u R %u Unknown %s [%s] In: %u Out: %u Size: %u ", currentmjd(), dt.lap(), local_node, remote_node, out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1701  }
1702  }
1703  else
1704  {
1705  fprintf(agent->get_debug_fd(), "%16.10f %.4f RECV L %u R %u %s %s [%s] In: %u Out: %u Size: %u ", currentmjd(), dt.lap(), local_node, remote_node, out_comm_channel[use_channel].node.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1706  }
1707  }
1708  else if (direction == PACKET_OUT)
1709  {
1710  local_node = buf[PACKET_HEADER_OFFSET_NODE_ID];
1711  switch (buf[0] & 0x0f)
1712  {
1713  case PACKET_HEARTBEAT:
1714  case PACKET_REQQUEUE:
1715  case PACKET_QUEUE:
1716  case PACKET_REQMETA:
1717  node_name = reinterpret_cast<char *>(&buf[PACKET_HEADER_OFFSET_NODE_NAME]);
1718  remote_node = lookup_remote_node_id(node_name);
1719  if (local_node == 0)
1720  {
1721  local_node = add_node_name(node_name);
1722  }
1723  txq[local_node].remote_id = remote_node;
1724  break;
1725  case PACKET_METADATA:
1726  case PACKET_REQDATA:
1727  case PACKET_DATA:
1728  case PACKET_COMPLETE:
1729  case PACKET_CANCEL:
1730  if (local_node > 0)
1731  {
1732  remote_node = lookup_remote_node_id(local_node);
1733  node_name = txq[local_node].node_name;
1734  }
1735  break;
1736  }
1737 
1738  if (out_comm_channel[use_channel].node.empty())
1739  {
1740  if (!node_name.empty())
1741  {
1742  fprintf(agent->get_debug_fd(), "%16.10f %.4f SEND L %u R %u %s %s [%s] In: %u Out: %u Size: %u ", currentmjd(), dt.lap(), local_node, remote_node, node_name.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1743  }
1744  else
1745  {
1746  fprintf(agent->get_debug_fd(), "%16.10f %.4f SEND L %u R %u Unknown %s [%s] In: %u Out: %u Size: %u ", currentmjd(), dt.lap(), local_node, remote_node, out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1747  }
1748  }
1749  else
1750  {
1751  fprintf(agent->get_debug_fd(), "%16.10f %.4f SEND L %u R %u %s %s [%s] In: %u Out: %u Size: %u ", currentmjd(), dt.lap(), local_node, remote_node, out_comm_channel[use_channel].node.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1752  }
1753  }
1754 
1755  switch (buf[0] & 0x0f)
1756  {
1757  case PACKET_METADATA:
1758  {
1759  string file_name(&buf[PACKET_METASHORT_OFFSET_FILE_NAME], &buf[PACKET_METASHORT_OFFSET_FILE_NAME+TRANSFER_MAX_FILENAME]);
1760  fprintf(agent->get_debug_fd(), "[METADATA] %u %u %s ", local_node, buf[PACKET_METASHORT_OFFSET_TX_ID], file_name.c_str());
1761  break;
1762  }
1763  case PACKET_DATA:
1764  {
1766  break;
1767  }
1768  case PACKET_REQDATA:
1769  {
1771  break;
1772  }
1773  case PACKET_REQMETA:
1774  {
1775  fprintf(agent->get_debug_fd(), "[REQMETA] %u %s ", local_node, &buf[PACKET_REQMETA_OFFSET_NODE_NAME]);
1776  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1778  {
1779  fprintf(agent->get_debug_fd(), "%u ", buf[PACKET_REQMETA_OFFSET_TX_ID+i]);
1780  }
1781  break;
1782  }
1783  case PACKET_COMPLETE:
1784  {
1785  fprintf(agent->get_debug_fd(), "[COMPLETE] %u %u ", local_node, buf[PACKET_COMPLETE_OFFSET_TX_ID]);
1786  break;
1787  }
1788  case PACKET_CANCEL:
1789  {
1790  fprintf(agent->get_debug_fd(), "[CANCEL] %u %u ", local_node, buf[PACKET_CANCEL_OFFSET_TX_ID]);
1791  break;
1792  }
1793  case PACKET_REQQUEUE:
1794  {
1795  fprintf(agent->get_debug_fd(), "[REQQUEUE] %u %s ", local_node, &buf[PACKET_QUEUE_OFFSET_NODE_NAME]);
1796  }
1797  break;
1798  case PACKET_QUEUE:
1799  {
1800  fprintf(agent->get_debug_fd(), "[QUEUE] %u %s ", local_node, &buf[PACKET_QUEUE_OFFSET_NODE_NAME]);
1801  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1803  {
1804  fprintf(agent->get_debug_fd(), "%u ", buf[PACKET_QUEUE_OFFSET_TX_ID+i]);
1805  }
1806  }
1807  break;
1808  case PACKET_HEARTBEAT:
1809  {
1810  fprintf(agent->get_debug_fd(), "[HEARTBEAT] %u %s %hu %u %u", local_node, &buf[PACKET_HEARTBEAT_OFFSET_NODE_NAME], buf[PACKET_HEARTBEAT_OFFSET_BEAT_PERIOD]
1813  break;
1814  }
1815 
1816  }
1817  fprintf(agent->get_debug_fd(), "\n");
1818  fflush(agent->get_debug_fd());
1819  debug_fd_lock.unlock();
1820  }
1821 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
#define PACKET_METASHORT_OFFSET_TX_ID
Definition: transferlib.h:255
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
#define TRANSFER_MAX_FILENAME
Definition: transferlib.h:78
static Agent * agent
Definition: agent_file3.cpp:94
#define PACKET_QUEUE_OFFSET_TX_ID
Definition: transferlib.h:214
static uint32_t packet_out_count
Definition: agent_file3.cpp:149
ElapsedTime dt
Definition: agent_file3.cpp:183
#define PACKET_HEARTBEAT_OFFSET_BEAT_PERIOD
Definition: transferlib.h:166
#define PACKET_HEARTBEAT_OFFSET_FUNIXTIME
Definition: transferlib.h:168
int i
Definition: rw_test.cpp:37
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
#define PACKET_REQDATA_OFFSET_HOLE_END
Definition: transferlib.h:272
#define PACKET_OUT
Definition: agent_file3.cpp:71
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
#define PACKET_DATA_OFFSET_TX_ID
Definition: transferlib.h:285
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
#define PACKET_REQMETA_OFFSET_NODE_NAME
Definition: transferlib.h:225
static uint32_t packet_in_count
Definition: agent_file3.cpp:148
#define PACKET_COMPLETE_OFFSET_TX_ID
Definition: transferlib.h:298
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
uint8_t lookup_remote_node_id(PACKET_NODE_ID_TYPE local_id)
Definition: agent_file3.cpp:3204
#define PACKET_CANCEL_OFFSET_TX_ID
Definition: transferlib.h:308
#define PACKET_DATA_OFFSET_BYTE_COUNT
Definition: transferlib.h:286
int32_t add_node_name(string node_name)
Definition: agent_file3.cpp:3110
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define PACKET_REQDATA_OFFSET_TX_ID
Definition: transferlib.h:270
#define PACKET_REQMETA_OFFSET_TX_ID
Definition: transferlib.h:226
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
#define PACKET_DATA_OFFSET_CHUNK_START
Definition: transferlib.h:287
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
#define PACKET_QUEUE_OFFSET_NODE_NAME
Definition: transferlib.h:213
Definition: elapsedtime.h:62
#define PACKET_HEADER_OFFSET_NODE_NAME
Definition: transferlib.h:153
#define PACKET_HEARTBEAT_OFFSET_NODE_NAME
Definition: transferlib.h:165
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
#define PACKET_REQDATA_OFFSET_HOLE_START
Definition: transferlib.h:271
#define PACKET_METASHORT_OFFSET_FILE_NAME
Definition: transferlib.h:257
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
char buf[128]
Definition: rw_test.cpp:40
static const unsigned char PACKET_HEARTBEAT
Definition: transferlib.h:111
static string node
Definition: agent_monitor.cpp:126
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
#define PACKET_HEARTBEAT_OFFSET_THROUGHPUT
Definition: transferlib.h:167
static const unsigned char PACKET_REQQUEUE
Definition: transferlib.h:110
#define PACKET_HEADER_OFFSET_NODE_ID
Definition: transferlib.h:152
#define PACKET_IN
Definition: agent_file3.cpp:70
int32_t write_meta ( tx_progress tx,
double  interval = 5. 
)
1824 {
1825  vector<PACKET_BYTE> packet;
1826  std::ofstream file_name;
1827 
1828  if (currentmjd(0.) - tx.savetime > interval/86400.)
1829  {
1830  tx.savetime = currentmjd(0.);
1831  make_metadata_packet(packet, tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.node_name.c_str(), (char *)tx.agent_name.c_str());
1832  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1833  if(!file_name.is_open())
1834  {
1835  return (-errno);
1836  }
1837 
1838  uint16_t crc;
1839  file_name.write((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1840  crc = slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1841  file_name.write((char *)&crc, 2);
1842  for (file_progress progress_info : tx.file_info)
1843  {
1844  file_name.write((const char *)&progress_info, sizeof(progress_info));
1845  crc = slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info));
1846  file_name.write((char *)&crc, 2);
1847  }
1848  file_name.close();
1849  }
1850 
1851  return 0;
1852 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
deque< file_progress > file_info
Definition: transferlib.h:358
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
int32_t read_meta ( tx_progress tx)
1855 {
1856  vector<PACKET_BYTE> packet(PACKET_METALONG_OFFSET_TOTAL,0);
1857  std::ifstream file_name;
1859 
1860  struct stat statbuf;
1861  if (!stat((tx.temppath + ".meta").c_str(), &statbuf) && statbuf.st_size >= COSMOS_SIZEOF(file_progress))
1862  {
1863  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1864  if(!file_name.is_open())
1865  {
1866  return (-errno);
1867  }
1868  }
1869  else
1870  {
1871  // remove((tx.temppath + ".meta").c_str());
1872  return DATA_ERROR_SIZE_MISMATCH;
1873  }
1874 
1875  tx.fp = nullptr;
1876  tx.savetime = 0.;
1877  tx.complete = false;
1878 
1879 
1880  // load metadata
1881 
1882  file_name.read((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1883  if (file_name.eof())
1884  {
1885  return DATA_ERROR_SIZE_MISMATCH;
1886  }
1887  uint16_t crc;
1888  file_name.read((char *)&crc, 2);
1889  if (file_name.eof())
1890  {
1891  return DATA_ERROR_SIZE_MISMATCH;
1892  }
1893  if (crc != slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL))
1894  {
1895  file_name.close();
1896  return DATA_ERROR_CRC;
1897  }
1898  extract_metadata(packet, meta);
1899  tx.havemeta = true;
1900  tx.tx_id = meta.tx_id;
1901  tx.node_name = meta.node_name;
1902  tx.agent_name = meta.agent_name;
1903  tx.file_name = meta.file_name;
1904  tx.filepath = data_base_path(tx.node_name, "outgoing", tx.agent_name, tx.file_name);
1905  tx.file_size = meta.file_size;
1906 
1907  // load file progress
1908  file_progress progress_info;
1909  do
1910  {
1911  file_name.read((char *)&progress_info, sizeof(progress_info));
1912  if (file_name.eof())
1913  {
1914  break;
1915  }
1916  uint16_t crc;
1917  file_name.read((char *)&crc, 2);
1918  if (file_name.eof())
1919  {
1920  return DATA_ERROR_SIZE_MISMATCH;
1921  }
1922  if (crc != slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info)))
1923  {
1924  file_name.close();
1925  return DATA_ERROR_CRC;
1926  }
1927 
1928  tx.file_info.push_back(progress_info);
1929  } while(!file_name.eof());
1930  file_name.close();
1931  if (agent->debug_level)
1932  {
1933  debug_fd_lock.lock();
1934  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: read_meta: %s tx_id: %u chunks: %" PRIu32 "\n", currentmjd(), dt.lap(), (tx.temppath + ".meta").c_str(), tx.tx_id, tx.file_info.size());
1935  fflush(agent->get_debug_fd());
1936  debug_fd_lock.unlock();
1937  }
1938 
1939  // fix any overlaps and count total bytes
1940  // merge_chunks_overlap(tx);
1941  // tx.state = STATE_REQDATA;
1942 
1943  return 0;
1944 }
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:231
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
char file_name[128]
Definition: transferlib.h:234
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
ElapsedTime dt
Definition: agent_file3.cpp:183
Definition: transferlib.h:229
#define COSMOS_SIZEOF(element)
Definition: configCosmos.h:139
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
deque< file_progress > file_info
Definition: transferlib.h:358
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define DATA_ERROR_CRC
Definition: cosmos-errno.h:151
string node_name
Definition: transferlib.h:349
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:233
string filepath
Definition: transferlib.h:352
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:235
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
bool complete
Definition: transferlib.h:348
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:232
bool tx_progress_compare_by_size ( const tx_progress a,
const tx_progress b 
)
3007 {
3008  return a.file_size < b.file_size;
3009 }
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
bool filestruc_compare_by_size ( const filestruc a,
const filestruc b 
)
3002 {
3003  return a.size < b.size;
3004 }
off_t size
Definition: datalib.h:120
PACKET_TX_ID_TYPE check_tx_id ( tx_entry txentry,
PACKET_TX_ID_TYPE  tx_id 
)
3087 {
3088  if (tx_id != 0 && txentry.progress[tx_id].tx_id == tx_id)
3089  {
3090  return tx_id;
3091  }
3092  else
3093  {
3094  return 0;
3095  }
3096 }
std::vector< tx_progress > progress
Definition: agent_file.cpp:145
int32_t check_channel ( PACKET_NODE_ID_TYPE  node_id)
3099 {
3100  for(uint16_t i=1; i<out_comm_channel.size(); ++i)
3101  {
3102  if (out_comm_channel[i].node == txq[node_id].node_name)
3103  {
3104  return i;
3105  }
3106  }
3107  return -1;
3108 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
string node_name
Definition: agent_001.cpp:46
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
static string node
Definition: agent_monitor.cpp:126
int32_t add_node_name ( string  node_name)
3111 {
3112  uint8_t local_node = 0;
3113  tx_queue tx;
3114 
3115  if (txq.size() == 256)
3116  {
3117  return TRANSFER_ERROR_NODE;
3118  }
3119 
3120  if (txq.empty())
3121  {
3122  tx.node_name = "";
3123  tx.remote_id = 0;
3124  tx.incoming.size = 0;
3125  tx.outgoing.next_id = 1;
3126  tx.outgoing.size = 0;
3127  txq.push_back(tx);
3128  incoming_tx_purge(local_node);
3129  outgoing_tx_purge(local_node);
3130  }
3131 
3132  for (uint16_t i=1; i<txq.size(); ++i)
3133  {
3134  if (txq[i].node_name == node_name)
3135  {
3136  return i;
3137  }
3138  }
3139 
3140  local_node = txq.size();
3141  tx.node_name = node_name;
3142  tx.remote_id = 0;
3143  tx.incoming.size = 0;
3144  tx.outgoing.next_id = 1;
3145  tx.outgoing.size = 0;
3146  txq.push_back(tx);
3147  incoming_tx_purge(local_node);
3148  outgoing_tx_purge(local_node);
3149  return local_node;
3150 }
PACKET_NODE_ID_TYPE remote_id
Definition: agent_file3.cpp:174
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
tx_entry outgoing
Definition: agent_file.cpp:156
int32_t incoming_tx_purge(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2933
string node_name
Definition: agent_001.cpp:46
int32_t outgoing_tx_purge(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2550
PACKET_TX_ID_TYPE size
Definition: agent_file.cpp:146
PACKET_TX_ID_TYPE next_id
Definition: agent_file.cpp:148
Definition: agent_file.cpp:151
std::string node_name
Definition: agent_file.cpp:153
tx_entry incoming
Definition: agent_file.cpp:155
uint8_t check_local_node_id ( PACKET_NODE_ID_TYPE  local_id)
3153 {
3154  uint8_t id = 0;
3155  if (local_id > 0 && local_id < txq.size())
3156  {
3157  id = local_id;
3158  }
3159  return id;
3160 }
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
uint8_t lookup_local_node_id ( PACKET_NODE_ID_TYPE  remote_id)
3163 {
3164  uint8_t id = 0;
3165  for (uint8_t i=1; i<txq.size(); ++i)
3166  {
3167  if (txq[i].remote_id == remote_id)
3168  {
3169  id = i;
3170  break;
3171  }
3172  }
3173  return id;
3174 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
uint8_t lookup_local_node_id ( string  node_name)
3177 {
3178  uint8_t local_id = 0;
3179  for (uint8_t i=1; i<txq.size(); ++i)
3180  {
3181  if (txq[i].node_name == node_name)
3182  {
3183  local_id = i;
3184  break;
3185  }
3186  }
3187  return local_id;
3188 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
string node_name
Definition: agent_001.cpp:46
uint8_t check_remote_node_id ( PACKET_NODE_ID_TYPE  remote_id)
3191 {
3192  uint8_t id = 0;
3193  for (uint8_t i=1; i<txq.size(); ++i)
3194  {
3195  if (txq[i].remote_id == remote_id)
3196  {
3197  id = remote_id;
3198  break;
3199  }
3200  }
3201  return id;
3202 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
uint8_t lookup_remote_node_id ( PACKET_NODE_ID_TYPE  local_id)
3205 {
3206  int32_t remote_id = 0;
3207  if (local_id > 0 && local_id < txq.size())
3208  {
3209  if (txq[local_id].remote_id > 0)
3210  {
3211  remote_id = txq[local_id].remote_id;
3212  }
3213  }
3214  return remote_id;
3215 }
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
uint8_t lookup_remote_node_id ( string  node_name)
3218 {
3219  uint8_t remote_id = 0;
3220  for (uint8_t i=1; i<txq.size(); ++i)
3221  {
3222  if (txq[i].node_name == node_name)
3223  {
3224  remote_id = txq[i].remote_id;
3225  break;
3226  }
3227  }
3228  return remote_id;
3229 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
string node_name
Definition: agent_001.cpp:46
uint8_t set_remote_node_id ( PACKET_NODE_ID_TYPE  node_id,
string  node_name 
)
3232 {
3233  int32_t id = 0;
3234  for (uint16_t i=1; i<txq.size(); ++i)
3235  {
3236  if (txq[i].node_name == node_name)
3237  {
3238  txq[i].remote_id = node_id;
3239  id = i;
3240  break;
3241  }
3242  }
3243  return id;
3244 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
string node_name
Definition: agent_001.cpp:46
string get_remote_node_name ( PACKET_NODE_ID_TYPE  node_id)
3247 {
3248  string name;
3249  for (uint16_t i=1; i<txq.size(); ++i)
3250  {
3251  if (txq[i].remote_id == node_id)
3252  {
3253  name = txq[i].node_name;
3254  break;
3255  }
3256  }
3257  return name;
3258 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
string name
Definition: cubesat2obj.cpp:6
PACKET_TX_ID_TYPE choose_incoming_tx_id ( uint8_t  node)
3012 {
3013  PACKET_TX_ID_TYPE tx_id = 0;
3014 
3015  if (local_node > 0 && local_node < txq.size())
3016  {
3017  // Choose file with least data left to send
3018  PACKET_FILE_SIZE_TYPE nsize = INT32_MAX;
3020  {
3021  // calculate bytes so far
3022  merge_chunks_overlap(txq[(local_node)].incoming.progress[i]);
3023 
3024  // Remove anything suspicious: file_size == 0, file_size < total_bytes
3025  if (txq[(local_node)].incoming.progress[i].havemeta && (txq[(local_node)].incoming.progress[i].file_size == 0 || txq[(local_node)].incoming.progress[i].file_size < txq[(local_node)].incoming.progress[i].total_bytes))
3026  {
3027  incoming_tx_del(local_node, txq[(local_node)].incoming.progress[i].tx_id);
3028  }
3029  // Choose transactions for which we: have meta and bytes remaining is minimized
3030  else if (txq[(local_node)].incoming.progress[i].tx_id && txq[(local_node)].incoming.progress[i].havemeta && (txq[(local_node)].incoming.progress[i].file_size - txq[(local_node)].incoming.progress[i].total_bytes) < nsize)
3031  {
3032  nsize = txq[(local_node)].incoming.progress[i].file_size - txq[(local_node)].incoming.progress[i].total_bytes;
3033  tx_id = txq[(local_node)].incoming.progress[i].tx_id;
3034  }
3035  }
3036  }
3037 
3038  return tx_id;
3039 }
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2872
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file3.cpp:1951
PACKET_TX_ID_TYPE choose_outgoing_tx_id ( uint8_t  node)
3042 {
3043  PACKET_TX_ID_TYPE tx_id = 0;
3044 
3045  if (local_node > 0 && local_node < txq.size())
3046  {
3047  // Choose file with least data left to send
3048  PACKET_FILE_SIZE_TYPE nsize = INT32_MAX;
3050  {
3051  if (txq[(local_node)].outgoing.progress[i].tx_id && txq[(local_node)].outgoing.progress[i].sentmeta && !txq[(local_node)].outgoing.progress[i].sentdata)
3052  {
3053  // calculate bytes so far
3054  merge_chunks_overlap(txq[(local_node)].outgoing.progress[i]);
3055 
3056  // Remove anything suspicious: file_size == 0, file_size < total_bytes
3057  if (txq[(local_node)].outgoing.progress[i].file_size == 0 || txq[(local_node)].outgoing.progress[i].file_size < txq[(local_node)].outgoing.progress[i].total_bytes)
3058  {
3059  outgoing_tx_del(local_node, txq[(local_node)].outgoing.progress[i].tx_id);
3060  }
3061  // Choose unfinished transactions for which bytes remaining is minimized
3062  else if (txq[(local_node)].outgoing.progress[i].total_bytes < nsize)
3063  {
3064  nsize = txq[(local_node)].outgoing.progress[i].total_bytes;
3065  tx_id = txq[(local_node)].outgoing.progress[i].tx_id;
3066  }
3067  }
3068  }
3069  }
3070 
3071  if (tx_id)
3072  {
3073 // vector<file_progress> togo;
3074 // togo = find_chunks_togo(txq[(local_node)].outgoing.progress[tx_id]);
3075 // for (file_progress missed : togo)
3076 // {
3077 // txq[(local_node)].outgoing.progress[tx_id].file_info.push_back(missed);
3078 // }
3079 
3080  merge_chunks_overlap(txq[(local_node)].outgoing.progress[tx_id]);
3081  txq[(local_node)].outgoing.progress[tx_id].senddata = true;
3082  }
3083  return tx_id;
3084 }
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2478
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file3.cpp:1951
int32_t next_incoming_tx ( PACKET_NODE_ID_TYPE  node,
int32_t  use_channel 
)
3261 {
3263 
3264  if (tx_id < PROGRESS_QUEUE_SIZE && tx_id > 0)
3265  {
3266  // See if we know what the remote node_id is for this
3267  if (txq[node].remote_id > 0)
3268  {
3269  // Check if file has been completely received
3270  if(txq[(node)].incoming.progress[tx_id].file_size == txq[(node)].incoming.progress[tx_id].total_bytes && txq[(node)].incoming.progress[tx_id].havemeta)
3271  {
3272  // tx_progress tx_in = txq[(node)].incoming.progress[tx_id];
3273  if (agent->debug_level)
3274  {
3275  debug_fd_lock.lock();
3276  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming(next_incoming_tx): Complete: %u %s %u %u\n", currentmjd(), dt.lap(), txq[(node)].incoming.progress[tx_id].tx_id, txq[(node)].incoming.progress[tx_id].node_name.c_str(), txq[(node)].incoming.progress[tx_id].file_size, txq[(node)].incoming.progress[tx_id].total_bytes);
3277  fflush(agent->get_debug_fd());
3278  debug_fd_lock.unlock();
3279  }
3280 
3281  // Move file to its final location
3282  if (!txq[(node)].incoming.progress[tx_id].complete)
3283  {
3284  if (txq[(node)].incoming.progress[tx_id].fp != nullptr)
3285  {
3286  fclose(txq[(node)].incoming.progress[tx_id].fp);
3287  txq[(node)].incoming.progress[tx_id].fp = nullptr;
3288  }
3289  string final_filepath = txq[(node)].incoming.progress[tx_id].temppath + ".file";
3290  int32_t iret = rename(final_filepath.c_str(), txq[(node)].incoming.progress[tx_id].filepath.c_str());
3291  // Make sure metadata is recorded
3292  write_meta(txq[(node)].incoming.progress[tx_id], 0.);
3293  if (agent->debug_level)
3294  {
3295  debug_fd_lock.lock();
3296  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming(next_incoming_tx): Renamed: %d %s\n", currentmjd(), dt.lap(), iret, txq[(node)].incoming.progress[tx_id].filepath.c_str());
3297  fflush(agent->get_debug_fd());
3298  debug_fd_lock.unlock();
3299  }
3300  txq[(node)].incoming.progress[tx_id].complete = true;
3301  }
3302  }
3303  else
3304  {
3305  if (agent->debug_level)
3306  {
3307  debug_fd_lock.lock();
3308  fprintf(agent->get_debug_fd(), "%16.10f %.4f Incoming(next_incoming_tx): More: %u %s %u %u\n", currentmjd(), dt.lap(), txq[(node)].incoming.progress[tx_id].tx_id, txq[(node)].incoming.progress[tx_id].node_name.c_str(), txq[(node)].incoming.progress[tx_id].file_size, txq[(node)].incoming.progress[tx_id].total_bytes);
3309  fflush(agent->get_debug_fd());
3310  debug_fd_lock.unlock();
3311  }
3312  out_comm_channel[use_channel].send_reqdata = true;
3313  }
3314  }
3315  }
3316  return tx_id;
3317 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file3.cpp:1823
ElapsedTime dt
Definition: agent_file3.cpp:183
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
static uint16_t use_channel
Definition: agent_file.cpp:97
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
static string node
Definition: agent_monitor.cpp:126
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file3.cpp:3086
PACKET_TX_ID_TYPE choose_incoming_tx_id(uint8_t node)
Definition: agent_file3.cpp:3011
string json_list_incoming ( )
3370  {
3371  JSONObject jobj;
3372  JSONArray incoming;
3373 
3374  incoming.resize(txq.size());
3375  for (uint16_t node=0; node<txq.size(); ++node)
3376  {
3377 
3378  JSONObject node_obj("node", txq[node].node_name);
3379  node_obj.addElement("count", txq[node].incoming.size);
3380 
3381  JSONArray files;
3382  files.resize(txq[node].incoming.size);
3383  int i =0;
3384  for(tx_progress tx : txq[node].incoming.progress)
3385  {
3386  if (tx.tx_id)
3387  {
3388  JSONObject f("tx_id", tx.tx_id);
3389  f.addElement("agent", tx.agent_name);
3390  f.addElement("name", tx.file_name);
3391  f.addElement("bytes", tx.total_bytes);
3392  f.addElement("size", tx.file_size);
3393  files.at(i) = (JSONValue(f));
3394  i++;
3395  }
3396  }
3397  node_obj.addElement("files", files);
3398  incoming.at(node) = JSONValue(node_obj);
3399 
3400  }
3401  jobj.addElement("incoming", incoming);
3402  return jobj.to_json_string();
3403 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
string json_list_outgoing ( )
3405  {
3406  JSONObject jobj;
3407  JSONArray outgoing;
3408 
3409  outgoing.resize(txq.size());
3410  for (uint16_t node=0; node<txq.size(); ++node)
3411  {
3412 
3413  JSONObject node_obj("node", txq[node].node_name);
3414  node_obj.addElement("count", txq[node].outgoing.size);
3415 
3416  JSONArray files;
3417  files.resize(txq[node].outgoing.size);
3418  int i =0;
3419  for(tx_progress tx : txq[node].outgoing.progress)
3420  {
3421  if (tx.tx_id)
3422  {
3423  JSONObject f("tx_id", tx.tx_id);
3424  f.addElement("agent", tx.agent_name);
3425  f.addElement("name", tx.file_name);
3426  f.addElement("bytes", tx.total_bytes);
3427  f.addElement("size", tx.file_size);
3428  files.at(i) = (JSONValue(f));
3429  i++;
3430  }
3431  }
3432  node_obj.addElement("files", files);
3433  outgoing.at(node) = JSONValue(node_obj);
3434 
3435  }
3436  jobj.addElement("outgoing", outgoing);
3437  return jobj.to_json_string();
3438 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
string json_list_queue ( )
3441 {
3442  JSONObject jobj;
3443  JSONArray incoming;
3444  JSONArray outgoing;
3445 
3446  outgoing.resize(txq.size());
3447  incoming.resize(txq.size());
3448  for (uint16_t node=0; node<txq.size(); ++node)
3449  {
3450 
3451  JSONObject node_in("node", txq[node].node_name);
3452  node_in.addElement("count", txq[node].incoming.size);
3453 
3454  JSONArray ifiles;
3455  // ifiles.resize(txq[node].incoming.size);
3456  int i =0;
3457  for(tx_progress tx : txq[node].incoming.progress)
3458  {
3459  if (tx.tx_id)
3460  {
3461  JSONObject f("tx_id", tx.tx_id);
3462  f.addElement("agent", tx.agent_name);
3463  f.addElement("name", tx.file_name);
3464  f.addElement("bytes", tx.total_bytes);
3465  f.addElement("size", tx.file_size);
3466  ifiles.push_back(JSONValue(f));
3467  ifiles.at(i) = (JSONValue(f));
3468  // i++;
3469  }
3470  }
3471  node_in.addElement("files", ifiles);
3472  incoming.at(node) = JSONValue(node_in);
3473 
3474  JSONObject node_out("node", txq[node].node_name);
3475  node_out.addElement("count", txq[node].incoming.size);
3476  JSONArray files;
3477  // files.resize(txq[node].outgoing.size);
3478  for(tx_progress tx : txq[node].outgoing.progress)
3479  {
3480  if (tx.tx_id)
3481  {
3482  JSONObject f("tx_id", tx.tx_id);
3483  f.addElement("agent", tx.agent_name);
3484  f.addElement("name", tx.file_name);
3485  f.addElement("bytes", tx.total_bytes);
3486  f.addElement("size", tx.file_size);
3487  files.push_back(JSONValue(f));
3488  // files.at(i) = (JSONValue(f));
3489  // i++;
3490  }
3491  }
3492  node_out.addElement("files", files);
3493  outgoing.at(node) = JSONValue(node_out);
3494 
3495  }
3496  jobj.addElement("outgoing", outgoing);
3497  jobj.addElement("incoming", incoming);
3498  return jobj.to_json_string();
3499 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
void write_queue_log ( double  logdate)
3349 {
3350  string record = json_list_queue(); // to append to file
3351 
3352  log_write(agent->cinfo->node.name, "file", logdate, "", "log", record, log_directory);
3353  log_move(data_type_path(agent->cinfo->node.name, log_directory, "file", logdate, "log"), data_type_path(agent->cinfo->node.name, "outgoing", "file", logdate, "log"), true);
3354 
3355 
3356 }
static Agent * agent
Definition: agent_file3.cpp:94
string json_list_queue()
Definition: agent_file3.cpp:3440
void log_move(string oldpath, string newpath, bool compress)
Move log file - path version.
Definition: datalib.cpp:200
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
string data_type_path(string node, string location, string agent, double mjd, string type)
Create data file path.
Definition: datalib.cpp:910
char name[40+1]
Node Name.
Definition: jsondef.h:3556
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
cosmosstruc * cinfo
Definition: agentclass.h:346
static string log_directory
Definition: agent_file3.cpp:181
int main ( int  argc,
char *  argv[] 
)
242 {
243  int32_t iretn;
244  thread send_loop_thread;
245  thread recv_loop_thread;
246  thread transmit_loop_thread;
247 
248  if (static_cast<string>(argv[0]).find("slow") != string::npos)
249  {
252  }
253 
254  agent = new Agent("", "file", 0.);
255  if ((iretn = agent->wait()) < 0)
256  {
257  fprintf(agent->get_debug_fd(), "%16.10f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
258  exit(iretn);
259  }
260  else
261  {
262  fprintf(agent->get_debug_fd(), "%16.10f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
263  }
264 
265  fprintf(agent->get_debug_fd(), "%16.10f Node: %s Agent: %s - Established\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
266  fflush(agent->get_debug_fd()); // Ensure this gets printed before blocking call
267 
268  out_comm_channel.resize(1);
269  if((iretn = socket_open(&out_comm_channel[0].chansock, NetworkType::UDP, "", AGENTRECVPORT, SOCKET_LISTEN, SOCKET_BLOCKING, 5000000)) < 0)
270  {
271  fprintf(agent->get_debug_fd(), "%16.10f Main: Node: %s Agent: %s - Listening socket failure\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
272  agent->shutdown();
273  exit (-errno);
274  }
275 
276  inet_ntop(out_comm_channel[0].chansock.caddr.sin_family, &out_comm_channel[0].chansock.caddr.sin_addr, out_comm_channel[0].chansock.address, sizeof(out_comm_channel[0].chansock.address));
277  out_comm_channel[0].chanip = out_comm_channel[0].chansock.address;
278  out_comm_channel[0].nmjd = currentmjd();
279  out_comm_channel[0].limjd = out_comm_channel[0].nmjd;
280  out_comm_channel[0].lomjd = out_comm_channel[0].nmjd;
281  out_comm_channel[0].fmjd = out_comm_channel[0].nmjd;
282  out_comm_channel[0].node = "";
283  out_comm_channel[0].throughput = default_throughput;
284  out_comm_channel[0].packet_size = default_packet_size;
285  fprintf(agent->get_debug_fd(), "%16.10f Node: %s Agent: %s - Listening socket open\n", currentmjd(), agent->nodeName.c_str(), agent->agentName.c_str());
286  fflush(agent->get_debug_fd()); // Ensure this gets printed before blocking call
287 
288  if (argc > 1 && ((argv[1][0] < '0' || argv[1][0] > '9') || argc == 3))
289  {
290  out_comm_channel.resize(2);
291  out_comm_channel[1].node = argv[argc-1];
292  size_t tloc = out_comm_channel[1].node.find(":");
293  if (tloc != string::npos)
294  {
295  out_comm_channel[1].chanip = out_comm_channel[1].node.substr(tloc+1, out_comm_channel[1].node.size()-tloc+1);
296  out_comm_channel[1].node = out_comm_channel[1].node.substr(0, tloc);
297  }
299  {
300  fprintf(agent->get_debug_fd(), "%16.10f Node: %s IP: %s - Sending socket failure\n", currentmjd(), out_comm_channel[1].node.c_str(), out_comm_channel[1].chanip.c_str());
301  agent->shutdown();
302  exit (-errno);
303  }
304  out_comm_channel[1].nmjd = currentmjd();
305  out_comm_channel[1].limjd = out_comm_channel[1].nmjd;
306  out_comm_channel[1].lomjd = out_comm_channel[1].nmjd;
307  out_comm_channel[1].fmjd = out_comm_channel[1].nmjd;
308  out_comm_channel[1].throughput = default_throughput;
309  out_comm_channel[1].packet_size = default_packet_size;
310  fprintf(agent->get_debug_fd(), "%16.10f Network: Old: %u %s %s %u\n", currentmjd(), 1, out_comm_channel[1].node.c_str(), out_comm_channel[1].chanip.c_str(), ntohs(out_comm_channel[1].chansock.caddr.sin_port));
311  fflush(agent->get_debug_fd());
312 
313  log_directory = "temp"; // put log files in node/outgoing/file
314  logstride_sec = 600.; // longer logstride
315  }
316 
317  if (argc > 1 && (argv[1][0] >= '0' && argv[1][0] <= '9'))
318  {
319  agent->debug_level = argv[1][0] - '0';
320  }
321  else
322  {
323  agent->debug_level = 0;
324  }
325 
326  // Restore in progress transfers from previous run
327  for (string node_name : data_list_nodes())
328  {
329  int32_t local_node = lookup_local_node_id(node_name);
330 
331  if (local_node == 0)
332  {
333  local_node = add_node_name(node_name);
334  }
335 
336  for(filestruc file : data_list_files(node_name, "temp", "file"))
337  {
338  // Add entry for each meta file
339  if (file.type == "meta")
340  {
341  // Incoming
342  if (!file.name.compare(0,3,"in_"))
343  {
344  tx_progress tx_in;
345  tx_in.temppath = file.path.substr(0,file.path.find(".meta"));
346  if (read_meta(tx_in) >= 0)
347  {
348  merge_chunks_overlap(tx_in);
349  iretn = incoming_tx_add(tx_in);
350  }
351  }
352 
353  // Outgoing
354  if (!file.name.compare(0,4,"out_"))
355  {
356  tx_progress tx_out;
357  tx_out.temppath = file.path.substr(0,file.path.find(".meta"));
358  if (read_meta(tx_out) >= 0)
359  {
360  find_chunks_togo(tx_out);
361  iretn = outgoing_tx_add(tx_out);
362  }
363  }
364  }
365  }
366  }
367 
368  // add agent_file requests
369  if ((iretn=agent->add_request("get_channels",request_get_channels,"", "get channel information")))
370  exit (iretn);
371  if ((iretn=agent->add_request("set_throughput",request_set_throughput,"{n} [throughput]", "set channel throughput")))
372  exit (iretn);
373  if ((iretn=agent->add_request("remove_file",request_remove_file,"in|out tx_id", "removes file from indicated queue")))
374  exit (iretn);
375  // if ((iretn=agent->add_request("send_file",request_send_file,"", "creates and sends metadata/data packets")))
376  // exit (iretn);
377  if ((iretn=agent->add_request("ls",request_ls,"", "lists contents of directory")))
378  exit (iretn);
379  if ((iretn=agent->add_request("list_incoming",request_list_incoming,"", "lists contents incoming queue")))
380  exit (iretn);
381  if ((iretn=agent->add_request("list_outgoing",request_list_outgoing,"", "lists contents outgoing queue")))
382  exit (iretn);
383  if ((iretn=agent->add_request("list_incoming_json",request_list_incoming_json,"", "lists contents incoming queue")))
384  exit (iretn);
385  if ((iretn=agent->add_request("list_outgoing_json",request_list_outgoing_json,"", "lists contents outgoing queue")))
386  exit (iretn);
387  if ((iretn=agent->add_request("set_logstride",request_set_logstride,"sec","set time interval of log files")))
388  exit (iretn);
389  if ((iretn=agent->add_request("get_logstride",request_get_logstride,"","get time interval of log files")))
390  exit (iretn);
391  if ((iretn=agent->add_request("debug",request_debug,"{0|1}","Toggle Debug information")))
392  exit (iretn);
393 
394  double nextdiskcheck = currentmjd(0.);
395  for (std::vector<tx_queue>::size_type node=0; node<txq.size(); ++node)
396  {
397  iretn = outgoing_tx_load(node);
398  if (iretn >= 0)
399  {
400  nextdiskcheck = currentmjd();
401  }
402  }
403 
404  send_loop_thread = thread([=] { send_loop(); });
405  recv_loop_thread = thread([=] { recv_loop(); });
406  transmit_loop_thread = thread([=] { transmit_loop(); });
407 
408  double nextlog = currentmjd();
409  double sleepsec;
410  ElapsedTime etloop;
411  etloop.start();
412 
413  // start the agent
414  while(agent->running())
415  {
416  if (agent->running() == (uint16_t)Agent::State::IDLE)
417  {
418  COSMOS_SLEEP(1);
419  continue;
420  }
421 
422 
423  if(nextdiskcheck < nextlog ) {
424  sleepsec = 86400. * (nextdiskcheck - currentmjd());
425  } else {
426  sleepsec = 86400. * (nextlog - currentmjd());
427  }
428  if (sleepsec > 0.)
429  {
430  COSMOS_SLEEP((sleepsec));
431  }
432 
433  if(currentmjd() > nextlog) {
435  nextlog = currentmjd(0.) + logstride_sec/86400.;
436  }
437 
438  // Check for new files to transmit if queue is not full and check is not delayed
439 
440 
441  if (currentmjd() > nextdiskcheck)
442  {
443 
444  nextdiskcheck = currentmjd(0.) + 10./86400.;
445  for (uint16_t node=0; node<txq.size(); ++node)
446  {
447  iretn = outgoing_tx_load(node);
448 // if (iretn >= 0)
449 // {
450 // nextdiskcheck = currentmjd();
451 // }
452  }
453  }
454  } // End WHILE Loop
455 
456  if (agent->debug_level)
457  {
458  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: Node: %s Agent: %s - Exiting\n", currentmjd(), dt.lap(), agent->nodeName.c_str(), agent->agentName.c_str());
459  fflush(agent->get_debug_fd());
460  }
461 
462  send_loop_thread.join();
463  recv_loop_thread.join();
464  // transmit_queue_check.notify_one();
465  transmit_loop_thread.join();
466  txq.clear();
467 
468  if (agent->debug_level)
469  {
470  fprintf(agent->get_debug_fd(), "%16.10f %.4f Main: Node: %s Agent: %s - Shutting down\n", currentmjd(), dt.lap(), agent->nodeName.c_str(), agent->agentName.c_str());
471  fflush(agent->get_debug_fd());
472  }
473 
474  agent->shutdown();
475 
476  exit (0);
477 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
int32_t request_ls(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:2087
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file3.cpp:94
Agent socket using Unicast UDP.
int32_t outgoing_tx_load(uint8_t node)
Definition: agent_file3.cpp:2616
static uint32_t default_throughput
Definition: agent_file3.cpp:117
void transmit_loop()
Definition: agent_file3.cpp:1397
ElapsedTime dt
Definition: agent_file3.cpp:183
string nodeName
Definition: agentclass.h:367
int32_t request_list_outgoing_json(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:3364
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file3.cpp:2693
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
int32_t request_set_logstride(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:3331
int iretn
Definition: rw_test.cpp:37
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
vector< filestruc > data_list_files(string directory)
Get list of files in a directory, directly.
Definition: datalib.cpp:461
double logstride_sec
Definition: agent_file3.cpp:182
#define THROUGHPUT_LO
Definition: agent_file3.cpp:66
static PACKET_CHUNK_SIZE_TYPE default_packet_size
Definition: agent_file3.cpp:116
vector< string > data_list_nodes()
Get list of Nodes, directly.
Definition: datalib.cpp:583
string node_name
Definition: agent_001.cpp:46
#define SOCKET_TALK
Talk followed by optional listen (sendto address)
Definition: socketlib.h:82
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t request_debug(string &request, string &, Agent *)
Definition: agent_file3.cpp:3319
Definition: datalib.h:113
void start()
ElapsedTime::start.
Definition: elapsedtime.cpp:203
#define AGENTRECVPORT
Default RECV port.
Definition: agentclass.h:200
string getAgent()
Definition: agentclass.cpp:2609
int32_t add_node_name(string node_name)
Definition: agent_file3.cpp:3110
int32_t request_list_outgoing(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:2147
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
int32_t add_request(string token, external_request_function function, string synopsis="", string description="")
Add internal request to Agent request list with description and synopsis.
Definition: agentclass.cpp:312
int32_t request_get_logstride(string &, string &response, Agent *)
Definition: agent_file3.cpp:3342
Definition: agentclass.h:139
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file3.cpp:2228
#define SOCKET_BLOCKING
Blocking Agent.
Definition: socketlib.h:78
int32_t shutdown()
Shutdown agent gracefully.
Definition: agentclass.cpp:366
#define AGENTRCVTIMEO
Default AGENT socket RCVTIMEO (100 msec)
Definition: agentclass.h:208
int32_t request_list_incoming(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:2117
double data_ctime(string path)
Definition: datalib.cpp:1910
Definition: elapsedtime.h:62
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
vector< file_progress > find_chunks_togo(tx_progress &tx)
Definition: agent_file3.cpp:2044
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
#define SOCKET_LISTEN
Listen followed by optional talk (recvfrom INADDRANY)
Definition: socketlib.h:84
int32_t read_meta(tx_progress &tx)
Definition: agent_file3.cpp:1854
#define PACKET_SIZE_LO
Definition: agent_file3.cpp:64
void recv_loop()
Definition: agent_file3.cpp:479
int32_t request_set_throughput(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:2186
int32_t socket_open(socket_channel *channel, NetworkType ntype, const char *address, uint16_t port, uint16_t role, bool blocking, uint32_t usectimeo, uint32_t rcvbuf, uint32_t sndbuf)
Open UDP socket.
Definition: socketlib.cpp:51
void write_queue_log(double logdate)
Definition: agent_file3.cpp:3348
static string node
Definition: agent_monitor.cpp:126
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t request_get_channels(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:2167
void send_loop()
Definition: agent_file3.cpp:1056
static string log_directory
Definition: agent_file3.cpp:181
int32_t request_remove_file(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:2207
string agentName
Definition: agentclass.h:368
int32_t request_list_incoming_json(string &request, string &response, Agent *agent)
Definition: agent_file3.cpp:3358
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file3.cpp:1951
bool lower_chunk ( file_progress  i,
file_progress  j 
)
1947 {
1948  return (i.chunk_start<j.chunk_start);
1949 }
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334

Variable Documentation

beatstruc cbeat
static

the (global) name of the heartbeat structure

Agent* agent
static

the (global) name of the cosmos data structure

PACKET_CHUNK_SIZE_TYPE default_packet_size = (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
static
uint32_t default_throughput = 700
static
vector<channelstruc> out_comm_channel
static
std::queue<transmit_queue_entry> transmit_queue
static
std::mutex incoming_tx_lock
static
std::mutex outgoing_tx_lock
static
std::mutex debug_fd_lock
static
double last_data_receive_time = 0.
static
double next_reqmeta_time = 0.
static
uint32_t packet_in_count = 0
static
uint32_t packet_out_count
static
uint32_t crc_error_count = 0
static
uint32_t timeout_error_count = 0
static
uint32_t type_error_count = 0
static
uint32_t send_error_count = 0
static
uint32_t recv_error_count = 0
static
vector<tx_queue> txq
static
string log_directory = "temp"
static
double logstride_sec = 10.