COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_file4.cpp File Reference
#include "support/configCosmos.h"
#include "agent/agentclass.h"
#include "support/jsonlib.h"
#include "support/transferlib.h"
#include "support/sliplib.h"
#include "support/jsonobject.h"
#include <algorithm>
#include <cstring>
#include <time.h>
#include <iostream>
#include <fstream>
#include <string>
#include <sys/stat.h>
#include <sys/select.h>
Include dependency graph for agent_file4.cpp:

Classes

struct  channelstruc
 
struct  transmit_queue_entry
 
struct  tx_entry
 
struct  tx_queue
 

Macros

#define PROGRESS_QUEUE_SIZE   256
 
#define PACKET_SIZE_LO   (200-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
 
#define THROUGHPUT_LO   130
 
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define THROUGHPUT_HI   1000
 
#define PACKET_IN   1
 
#define PACKET_OUT   2
 

Functions

void send_loop () noexcept
 
void recv_loop () noexcept
 
void transmit_loop () noexcept
 
void profile_check (int32_t line_number, uint16_t thread=0)
 
int32_t request_send_command (string &request, string &response, Agent *)
 
int32_t request_send_message (string &request, string &response, Agent *)
 
int32_t request_debug (string &request, string &, Agent *)
 
int32_t request_get_channels (string &request, string &response, Agent *agent)
 
int32_t request_set_throughput (string &request, string &response, Agent *agent)
 
int32_t request_remove_file (string &request, string &response, Agent *agent)
 
int32_t request_ls (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming_json (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing_json (string &request, string &response, Agent *agent)
 
int32_t request_set_logstride (string &request, string &response, Agent *agent)
 
int32_t request_get_logstride (string &, string &response, Agent *)
 
int32_t outgoing_tx_add (tx_progress &tx_out)
 
int32_t outgoing_tx_add (string node_name, string agent_name, string file_name)
 
int32_t outgoing_tx_del (uint8_t node, uint16_t tx_id=256, bool remove_file=true)
 
int32_t outgoing_tx_purge (uint8_t node, uint16_t tx_id=256)
 
int32_t outgoing_tx_recount (uint8_t node)
 
int32_t outgoing_tx_load (uint8_t node)
 
int32_t incoming_tx_add (tx_progress &tx_in)
 
int32_t incoming_tx_add (string node_name, PACKET_TX_ID_TYPE tx_id)
 
int32_t incoming_tx_update (packet_struct_metashort meta)
 
int32_t incoming_tx_del (uint8_t node, uint16_t tx_id=256)
 
int32_t incoming_tx_purge (uint8_t node, uint16_t tx_id=256)
 
int32_t incoming_tx_recount (uint8_t node)
 
vector< file_progressfind_chunks_missing (tx_progress &tx)
 
vector< file_progressfind_chunks_togo (tx_progress &tx)
 
PACKET_FILE_SIZE_TYPE merge_chunks_overlap (tx_progress &tx)
 
double queuecheck (PACKET_NODE_ID_TYPE node_id)
 
int32_t queuesendto (PACKET_NODE_ID_TYPE node_id, string type, vector< PACKET_BYTE > packet)
 
int32_t mysendto (string type, int32_t use_channel, vector< PACKET_BYTE > &buf)
 
int32_t myrecvfrom (string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
 
void debug_packet (vector< PACKET_BYTE > buf, uint8_t direction, string type, int32_t use_channel)
 
int32_t write_meta (tx_progress &tx, double interval=5.)
 
int32_t read_meta (tx_progress &tx)
 
bool tx_progress_compare_by_size (const tx_progress &a, const tx_progress &b)
 
bool filestruc_smaller_by_size (const filestruc &a, const filestruc &b)
 
bool filestruc_larger_by_size (const filestruc &a, const filestruc &b)
 
PACKET_TX_ID_TYPE check_tx_id (tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
 
int32_t check_channel (PACKET_NODE_ID_TYPE node_id)
 
int32_t add_node_name (string node_name)
 
PACKET_TX_ID_TYPE choose_incoming_tx_id (uint8_t node_id)
 
PACKET_TX_ID_TYPE choose_outgoing_tx_id (uint8_t node_id)
 
int32_t next_incoming_tx (PACKET_NODE_ID_TYPE node, int32_t use_channel)
 
string json_list_incoming ()
 
string json_list_outgoing ()
 
string json_list_queue ()
 
void write_queue_log (double logdate)
 
int main (int argc, char *argv[])
 
bool lower_chunk (file_progress i, file_progress j)
 

Variables

static beatstruc cbeat
 
static Agentagent
 
static PACKET_CHUNK_SIZE_TYPE default_packet_size = (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
static uint32_t default_throughput = 1000
 
static vector< channelstrucout_comm_channel
 
static std::queue< transmit_queue_entrytransmit_queue
 
static ElapsedTime tet
 
static std::mutex txqueue_lock
 
static std::mutex incoming_tx_lock
 
static std::mutex outgoing_tx_lock
 
static std::mutex debug_fd_lock
 
static double last_data_receive_time = 0.
 
static uint32_t packet_in_count = 0
 
static uint32_t packet_out_count
 
static uint32_t crc_error_count = 0
 
static uint32_t timeout_error_count = 0
 
static uint32_t type_error_count = 0
 
static uint32_t send_error_count = 0
 
static uint32_t recv_error_count = 0
 
static vector< tx_queuetxq
 
static string log_directory = "temp"
 
double logstride_sec = 10.
 
ElapsedTime dt
 

Macro Definition Documentation

#define PROGRESS_QUEUE_SIZE   256
#define PACKET_SIZE_LO   (200-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
#define THROUGHPUT_LO   130
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define THROUGHPUT_HI   1000
#define PACKET_IN   1
#define PACKET_OUT   2

Function Documentation

void send_loop ( )
noexcept
1108 {
1109  vector<PACKET_BYTE> packet;
1110  int32_t use_channel=-1;
1111 
1112  while (agent->running())
1113  {
1114 
1115  if (agent->running() == (uint16_t)Agent::State::IDLE)
1116  {
1117  COSMOS_SLEEP(1);
1118  continue;
1119  }
1120 
1121  for (uint8_t node_id=1; node_id<txq.size(); ++node_id)
1122  {
1123  // See if we have an active channel serving this Node
1124  if ((use_channel=check_channel(node_id)) <= 0)
1125  {
1126  continue;
1127  }
1128 
1129 // printf("use_channel: %d/%d\n", use_channel, out_comm_channel.size());
1130 
1131  // Sleep just a bit to let other threads act
1132 // COSMOS_SLEEP(out_comm_channel[use_channel].packet_size / out_comm_channel[use_channel].throughput);
1133  COSMOS_SLEEP(.1);
1134 
1135  // Set up to do the most important things first
1136  txq[(node_id)].outgoing.activity = false;
1137  txq[(node_id)].incoming.activity = false;
1138 
1139  // Let queue drain if necessary
1140  if (queuecheck(static_cast <PACKET_NODE_ID_TYPE>(node_id)) >= 2.)
1141  {
1142 // COSMOS_SLEEP(queuecheck(static_cast <PACKET_NODE_ID_TYPE>(node_id)) - 5.);
1143  continue;
1144  }
1145 
1146  // Send Heartbeat every 10 seconds, regardless
1147  if (txq[(node_id)].outgoing.heartbeatclock < currentmjd())
1148  {
1149  uint32_t funixtime = utc2unixseconds(out_comm_channel[use_channel].fmjd);
1150  make_heartbeat_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), txq[(node_id)].node_name, 4, out_comm_channel[use_channel].throughput, funixtime);
1151  queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Outgoing", packet);
1152  txq[(node_id)].outgoing.heartbeatclock = currentmjd() + 10. / 86400.;
1153  }
1154 
1155  // Send Queue packet, if anything needs to be queued
1156  outgoing_tx_lock.lock();
1157  if (txq[(node_id)].outgoing.sendqueue && !txq[(node_id)].outgoing.sentqueue)
1158  {
1159  vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1160  PACKET_TX_ID_TYPE iq = 0;
1161  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1162  {
1163  if (txq[(node_id)].outgoing.progress[tx_id].tx_id == tx_id)
1164  {
1165  tqueue[iq++] = txq[(node_id)].outgoing.progress[tx_id].tx_id;
1166 // txq[(node_id)].outgoing.progress[tx_id].sendmeta = true;
1167  }
1168  if (iq == TRANSFER_QUEUE_LIMIT)
1169  {
1170  break;
1171  }
1172  }
1173  if (iq)
1174  {
1175  make_queue_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), txq[(node_id)].node_name, tqueue);
1176  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Outgoing", packet);
1177  if (use_channel >= 0)
1178  {
1179  txq[(node_id)].outgoing.sendqueue = false;
1180  txq[(node_id)].outgoing.sentqueue = true;
1181  txq[(node_id)].outgoing.activity = true;
1182  }
1183  }
1184  }
1185  outgoing_tx_lock.unlock();
1186 
1187  profile_check(__LINE__, 2);
1188 
1189  // Send any pending Metadata packets
1190  outgoing_tx_lock.lock();
1191  if (txq[(node_id)].outgoing.sentqueue)
1192  {
1193  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1194  {
1195  if (txq[(node_id)].outgoing.progress[tx_id].tx_id == tx_id && txq[(node_id)].outgoing.progress[tx_id].sendmeta && !txq[(node_id)].outgoing.progress[tx_id].sentmeta)
1196  {
1197  tx_progress tx = txq[(node_id)].outgoing.progress[tx_id];
1198  vector<PACKET_BYTE> packet;
1199  make_metadata_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.agent_name.c_str());
1200  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Outgoing", packet);
1201  if (use_channel >= 0)
1202  {
1203  txq[(node_id)].outgoing.activity = true;
1204  txq[(node_id)].outgoing.progress[tx_id].sendmeta = false;
1205  txq[(node_id)].outgoing.progress[tx_id].havemeta = true;
1206  txq[(node_id)].outgoing.progress[tx_id].sentmeta = true;
1207  break;
1208  }
1209  }
1210  }
1211  }
1212  outgoing_tx_lock.unlock();
1213 
1214  profile_check(__LINE__, 2);
1215 
1216  // Send Data packets if we have data to send and we didn't do anything above
1217  outgoing_tx_lock.lock();
1218  if (txq[(node_id)].outgoing.sentqueue)
1219  {
1220  uint16_t tx_id = choose_outgoing_tx_id((node_id));
1221  if (txq[(node_id)].outgoing.progress[tx_id].tx_id == tx_id && txq[(node_id)].outgoing.progress[tx_id].sentmeta && txq[(node_id)].outgoing.progress[tx_id].senddata)
1222  {
1223  if (txq[(node_id)].outgoing.progress[tx_id].file_size)
1224  {
1225  // Check if there is any more data to send
1226  if (txq[(node_id)].outgoing.progress[tx_id].total_bytes == 0)
1227  {
1228  txq[(node_id)].outgoing.progress[tx_id].sentdata = true;
1229  txq[(node_id)].outgoing.progress[tx_id].senddata = false;
1230  }
1231  else
1232  {
1233  // Attempt to open the outgoing progress file
1234  if (txq[(node_id)].outgoing.progress[tx_id].fp == nullptr)
1235  {
1236  txq[(node_id)].outgoing.progress[tx_id].fp = fopen(txq[(node_id)].outgoing.progress[tx_id].filepath.c_str(), "r");
1237  }
1238 
1239  // If we're good, continue with the process
1240  if(txq[(node_id)].outgoing.progress[tx_id].fp != nullptr)
1241  {
1242  file_progress tp;
1243  tp = txq[(node_id)].outgoing.progress[tx_id].file_info[0];
1244 
1245  PACKET_FILE_SIZE_TYPE byte_count = (tp.chunk_end - tp.chunk_start) + 1;
1246  if (byte_count > out_comm_channel[use_channel].packet_size)
1247  {
1248  byte_count = out_comm_channel[use_channel].packet_size;
1249  }
1250 
1251  tp.chunk_end = tp.chunk_start + byte_count - 1;
1252 
1253  // Read the packet and send it
1254  int32_t nbytes;
1255  PACKET_BYTE* chunk = new PACKET_BYTE[byte_count]();
1256  if (!(nbytes = fseek(txq[(node_id)].outgoing.progress[tx_id].fp, tp.chunk_start, SEEK_SET)))
1257  {
1258  nbytes = fread(chunk, 1, byte_count, txq[(node_id)].outgoing.progress[tx_id].fp);
1259  }
1260  if (nbytes == byte_count)
1261  {
1262  make_data_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), txq[(node_id)].outgoing.progress[tx_id].tx_id, byte_count, tp.chunk_start, chunk);
1263  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Outgoing", packet);
1264  if (use_channel >= 0)
1265  {
1266  txq[(node_id)].outgoing.progress[tx_id].file_info[0].chunk_start = tp.chunk_end + 1;
1267  txq[(node_id)].outgoing.activity = true;
1268  }
1269  }
1270  else
1271  {
1272  // Some problem with this transmission, ask other end to dequeue it
1273  // Remove transaction
1274  txq[(node_id)].outgoing.progress[tx_id].senddata = false;
1275  txq[(node_id)].outgoing.progress[tx_id].sentdata = true;
1276  txq[(node_id)].outgoing.progress[tx_id].complete = true;
1277  }
1278  delete[] chunk;
1279 
1280  if (txq[(node_id)].outgoing.progress[tx_id].file_info[0].chunk_start > txq[(node_id)].outgoing.progress[tx_id].file_info[0].chunk_end)
1281  {
1282  // All done with this file_info entry. Close file and remove entry.
1283  fclose(txq[(node_id)].outgoing.progress[tx_id].fp);
1284  txq[(node_id)].outgoing.progress[tx_id].fp = nullptr;
1285  txq[(node_id)].outgoing.progress[tx_id].file_info.pop_front();
1286  }
1287 
1288  write_meta(txq[(node_id)].outgoing.progress[tx_id]);
1289  }
1290  else
1291  {
1292  // Some problem with this transmission, ask other end to dequeue it
1293 
1294  txq[(node_id)].outgoing.progress[tx_id].senddata = false;
1295  txq[(node_id)].outgoing.progress[tx_id].sentdata = true;
1296  txq[(node_id)].outgoing.progress[tx_id].complete = true;
1297  }
1298  }
1299  }
1300  // Zero length file, ask other end to dequeue it
1301  else
1302  {
1303  txq[(node_id)].outgoing.progress[tx_id].senddata = false;
1304  txq[(node_id)].outgoing.progress[tx_id].sentdata = true;
1305  txq[(node_id)].outgoing.progress[tx_id].complete = true;
1306  }
1307  }
1308  }
1309  outgoing_tx_lock.unlock();
1310 
1311  // Send Cancel packets if required
1312  outgoing_tx_lock.lock();
1313  if (txq[(node_id)].outgoing.sentqueue)
1314  {
1315  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1316  {
1317  if (txq[(node_id)].outgoing.progress[tx_id].tx_id == tx_id && txq[(node_id)].outgoing.progress[tx_id].complete)
1318  {
1319  // Remove from queue
1320  outgoing_tx_del(node_id, tx_id);
1321 
1322  // Send a CANCEL packet
1323  vector<PACKET_BYTE> packet;
1324  make_cancel_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), tx_id);
1325  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Outgoing", packet);
1326  if (use_channel >= 0)
1327  {
1328  txq[(node_id)].outgoing.activity = true;
1329  }
1330  }
1331  }
1332  }
1333  outgoing_tx_lock.unlock();
1334 
1335  // Send Reqmeta packet if needed and it has been otherwise quiet
1336  incoming_tx_lock.lock();
1337  if (!txq[(node_id)].incoming.rcvdqueue && !txq[(node_id)].incoming.rcvdmeta && txq[(node_id)].incoming.reqmetaclock < currentmjd())
1338  {
1339  vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1340  PACKET_TX_ID_TYPE iq = 0;
1341  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1342  {
1343  if (txq[(node_id)].incoming.progress[tx_id].tx_id && !txq[(node_id)].incoming.progress[tx_id].sentmeta)
1344  {
1345  tqueue[iq++] = tx_id;
1346  txq[(node_id)].incoming.progress[tx_id].sendmeta = true;
1347  }
1348  if (iq == TRANSFER_QUEUE_LIMIT)
1349  {
1350  break;
1351  }
1352  }
1353  if (iq)
1354  {
1355  vector<PACKET_BYTE> packet;
1356  make_reqmeta_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), txq[(node_id)].node_name, tqueue);
1357  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Incoming", packet);
1358  }
1359  txq[(node_id)].incoming.reqmetaclock = currentmjd() + 5. / 86400.;
1360  }
1361  incoming_tx_lock.unlock();
1362 
1363  // Send Reqdata packet if there is still data to be gotten and it has been otherwise quiet
1364  incoming_tx_lock.lock();
1365  if (!txq[(node_id)].incoming.rcvdqueue && !txq[(node_id)].incoming.rcvdmeta && !txq[(node_id)].incoming.rcvddata && txq[(node_id)].incoming.reqdataclock < currentmjd())
1366  {
1367  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1368  {
1369  if (txq[(node_id)].incoming.progress[tx_id].tx_id)
1370  {
1371  // Ask for missing data
1372  vector<file_progress> missing;
1373  missing = find_chunks_missing(txq[(node_id)].incoming.progress[tx_id]);
1374  for (uint32_t j=0; j<missing.size(); ++j)
1375  {
1376  vector<PACKET_BYTE> packet;
1377  make_reqdata_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), txq[(node_id)].incoming.progress[tx_id].tx_id, missing[j].chunk_start, missing[j].chunk_end);
1378  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Incoming", packet);
1379  }
1380  }
1381  }
1382  txq[(node_id)].incoming.reqdataclock = currentmjd() + 5. / 86400.;
1383  }
1384  incoming_tx_lock.unlock();
1385 
1386  // Send Complete packets if required
1387  incoming_tx_lock.lock();
1388  if (!txq[(node_id)].incoming.rcvdqueue && !txq[(node_id)].incoming.rcvdmeta && !txq[(node_id)].incoming.rcvddata)
1389  {
1390  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1391  {
1392  if (txq[(node_id)].incoming.progress[tx_id].tx_id == tx_id && txq[node_id].incoming.progress[tx_id].file_size == txq[node_id].incoming.progress[tx_id].total_bytes)
1393  {
1394  // Remove from queue
1395  txq[(node_id)].incoming.progress[tx_id].complete = true;
1396  incoming_tx_del(node_id, tx_id);
1397 
1398  // Send a COMPLETE packet
1399  vector<PACKET_BYTE> packet;
1400  make_complete_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), tx_id);
1401  use_channel = queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Incoming", packet);
1402  if (use_channel >= 0)
1403  {
1404  txq[(node_id)].incoming.activity = true;
1405  }
1406  }
1407  }
1408  }
1409  incoming_tx_lock.unlock();
1410 
1411  }
1412  }
1413 }
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file4.cpp:1843
double queuecheck(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file4.cpp:1449
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
void make_data_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, PACKET_TX_ID_TYPE tx_id, PACKET_CHUNK_SIZE_TYPE byte_count, PACKET_FILE_SIZE_TYPE chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:232
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file4.cpp:3000
Definition: transferlib.h:332
Definition: transferlib.h:338
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256, bool remove_file=true)
Definition: agent_file4.cpp:2564
static Agent * agent
Definition: agent_file4.cpp:94
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
static std::mutex incoming_tx_lock
Definition: agent_file4.cpp:141
double utc2unixseconds(double utc)
UTC to Unix time.
Definition: timelib.cpp:167
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
int32_t queuesendto(PACKET_NODE_ID_TYPE node_id, string type, vector< PACKET_BYTE > packet)
Definition: agent_file4.cpp:1473
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
void make_cancel_packet(vector< PACKET_BYTE > &packet, packet_struct_cancel cancel)
Definition: transferlib.cpp:82
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
void make_heartbeat_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, PACKET_BYTE beat_period, PACKET_UNIXTIME_TYPE throughput, PACKET_UNIXTIME_TYPE funixtime)
Definition: transferlib.cpp:299
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define SEEK_SET
Definition: zconf.h:475
void make_reqdata_packet(vector< PACKET_BYTE > &packet, packet_struct_reqdata reqdata)
Definition: transferlib.cpp:131
static std::mutex outgoing_tx_lock
Definition: agent_file4.cpp:142
string file_name
Definition: transferlib.h:351
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
uint8_t PACKET_BYTE
Definition: transferlib.h:140
PACKET_TX_ID_TYPE choose_outgoing_tx_id(uint8_t node_id)
Definition: agent_file4.cpp:3193
void make_queue_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > queue)
Definition: transferlib.cpp:277
int32_t check_channel(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file4.cpp:3258
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
void make_reqmeta_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > reqmeta)
Definition: transferlib.cpp:110
vector< file_progress > find_chunks_missing(tx_progress &tx)
Definition: agent_file4.cpp:2017
void profile_check(int32_t line_number, uint16_t thread=0)
Definition: agent_file4.cpp:3629
void recv_loop ( )
noexcept
503 {
504  vector<PACKET_BYTE> recvbuf;
505  string partial_filepath;
506  int32_t nbytes = 0;
507  socket_channel rchannel;
508  std::vector<channelstruc>::size_type use_channel = 0;
509 
510  while (agent->running())
511  {
512  if (agent->running() == (uint16_t)Agent::State::IDLE)
513  {
514  COSMOS_SLEEP(1);
515  continue;
516  }
517  else
518  {
519  COSMOS_SLEEP(.001);
520  }
521 
522  while (( nbytes = myrecvfrom("Incoming", rchannel, recvbuf, PACKET_MAX_LENGTH)) > 0)
523  {
524  // Generate extra network info
525  inet_ntop(rchannel.caddr.sin_family, &rchannel.caddr.sin_addr, rchannel.address, sizeof(rchannel.address));
526 
527  // Check channels, update information if we are already handling it, otherwise add channel
528  string packet_node_name;
530  int32_t node_id = check_node_id(recvbuf[PACKET_HEADER_OFFSET_NODE_ID]);
531  switch (recvbuf[0] & 0x0f)
532  {
533  case PACKET_HEARTBEAT:
534  case PACKET_REQQUEUE:
535  case PACKET_QUEUE:
536  case PACKET_REQMETA:
537  packet_node_name = reinterpret_cast<char *>(&recvbuf[PACKET_HEADER_OFFSET_NODE_NAME]);
538  break;
539  case PACKET_METADATA:
540  case PACKET_REQDATA:
541  case PACKET_DATA:
542  case PACKET_COMPLETE:
543  case PACKET_CANCEL:
544  break;
545  }
546 
547  if (node_id <= 0 || node_name.empty())
548  {
549  continue;
550  }
551 
552  txq[node_id].incoming.rcvdqueue = false;
553  txq[node_id].incoming.rcvdmeta = false;
554  txq[node_id].incoming.rcvddata = false;
555  incoming_tx_lock.unlock();
556 
557  use_channel = out_comm_channel.size();
558  for (std::vector<channelstruc>::size_type i=0; i<out_comm_channel.size(); ++i)
559  {
560  // Are we handling this Node?
561  if (out_comm_channel[i].node == node_name)
562  {
563  out_comm_channel[i].chansock = rchannel;
564  out_comm_channel[i].chanip = out_comm_channel[i].chansock.address;
565  use_channel = i;
566  break;
567  }
568  }
569 
570  if (use_channel == out_comm_channel.size())
571  {
572  channelstruc tchannel;
573  tchannel.node = node_name;
574  tchannel.nmjd = currentmjd(0.);
575  tchannel.limjd = tchannel.nmjd;
576  tchannel.lomjd = tchannel.nmjd;
577  tchannel.fmjd = tchannel.nmjd;
578  tchannel.chansock = rchannel;
579  inet_ntop(tchannel.chansock.caddr.sin_family, &tchannel.chansock.caddr.sin_addr, tchannel.chansock.address, sizeof(tchannel.chansock.address));
580  tchannel.chanip = tchannel.chansock.address;
581  use_channel = out_comm_channel.size();
582  out_comm_channel.push_back(tchannel);
583  }
584 
586 
587  // Respond appropriately according to type of packet
588  switch (recvbuf[0] & 0x0f)
589  {
590  case PACKET_COMMAND:
591  {
593 
594  extract_command(recvbuf, command);
595 
596  FILE *fp = data_open(("/cosmos/nodes/" + agent->nodeName + "/incoming/exec/file.command").c_str(), "w");
597  if (fp)
598  {
599  fwrite(&command.bytes[0], 1, command.length, fp);
600  fclose(fp);
601  }
602  }
603  break;
604  case PACKET_MESSAGE:
605  {
607 
608  extract_message(recvbuf, message);
609 
610  string imessage;
611  imessage.resize(message.length);
612  memcpy(&imessage[0], message.bytes, message.length);
613  log_write(lookup_node_id_name(message.node_id), "file", tet.split(), "", "message", imessage, "incoming");
614  }
615  break;
616  case PACKET_HEARTBEAT:
617  {
618  packet_struct_heartbeat heartbeat;
619 
620  extract_heartbeat(recvbuf, heartbeat);
621 
622  out_comm_channel[use_channel].heartbeat = heartbeat;
623  txq[node_id].outgoing.sendqueue = true;
624  txq[node_id].outgoing.sentqueue = false;
625  }
626  break;
627  case PACKET_REQQUEUE:
628  {
629  packet_struct_reqqueue reqqueue;
630 
631  extract_reqqueue(recvbuf, reqqueue);
632 
633  txq[node_id].outgoing.sendqueue = true;
634  txq[node_id].outgoing.sentqueue = false;
635  }
636  break;
637  case PACKET_QUEUE:
638  {
639  packet_struct_queue queue;
640 
641  extract_queue(recvbuf, queue);
642 
643  incoming_tx_lock.lock();
644 
645  txq[node_id].incoming.sentqueue = true;
646  txq[node_id].incoming.sendqueue = false;
647  txq[(node_id)].incoming.rcvdqueue = true;
648 
649  // Sort through incoming queue and remove anything not in sent queue
650  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
651  {
652  bool valid = false;
653  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
654  {
655  if (queue.tx_id[i] && txq[node_id].incoming.progress[tx_id].tx_id == queue.tx_id[i])
656  {
657  // This is an incoming file we should handle
658  valid = true;
659  break;
660  }
661  }
662 
663  if (txq[node_id].incoming.progress[tx_id].tx_id && !valid)
664  {
665  // This is not an incoming file we should handle
666  incoming_tx_del(node_id, tx_id);
667  }
668  }
669 
670  // Sort through remotely sent queue and add anything not in our local incoming queue
671  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
672  {
673  if (queue.tx_id[i])
674  {
675  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node_id].incoming, queue.tx_id[i]);
676 
677  if (tx_id == 0)
678  {
679  incoming_tx_add(queue.node_name, queue.tx_id[i]);
680  }
681  }
682  }
683 
684  incoming_tx_lock.unlock();
685  }
686  break;
687  //Request missing metadata
688  case PACKET_REQMETA:
689  {
690  packet_struct_reqmeta reqmeta;
691 
692  extract_reqmeta(recvbuf, reqmeta);
693 
694  outgoing_tx_lock.lock();
695  txq[node_id].outgoing.sentqueue = false;
696 
697  // Send requested META packets
698  if (txq[node_id].node_id > 0)
699  {
700  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
701  {
702  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node_id].outgoing, reqmeta.tx_id[i]);
703  if (tx_id > 0)
704  {
705  txq[node_id].outgoing.progress[tx_id].sendmeta = true;
706  txq[node_id].outgoing.progress[tx_id].sentmeta = false;
707  }
708  }
709  }
710  outgoing_tx_lock.unlock();
711  break;
712  }
713  case PACKET_METADATA:
714  {
716 
717  extract_metadata(recvbuf, meta);
718 
719  incoming_tx_lock.lock();
720 
721  incoming_tx_update(meta);
722 
723  incoming_tx_lock.unlock();
724 
725  break;
726  }
727  case PACKET_REQDATA:
728  {
729  packet_struct_reqdata reqdata;
730 
731  extract_reqdata(recvbuf, reqdata);
732 
733  // Simple validity check
734  if (reqdata.hole_end < reqdata.hole_start)
735  {
736  break;
737  }
738 
739  outgoing_tx_lock.lock();
740 
741  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node_id].outgoing, reqdata.tx_id);
742  // tx_id now points to the valid entry to which we should add the data
743 
744  if (tx_id > 0)
745  {
746  // Perform sanity check
747 // if (reqdata.hole_start >= txq[node_id].outgoing.progress[tx_id].file_size || reqdata.hole_end > txq[node_id].outgoing.progress[tx_id].file_size)
748 // {
749 // outgoing_tx_del(node_id, tx_id, false);
750 // continue;
751 // }
752  txq[node_id].outgoing.progress[tx_id].sendmeta = false;
753  txq[node_id].outgoing.progress[tx_id].sentmeta = true;
754 
755  // Add this chunk to the queue
756  file_progress tp;
757  tp.chunk_start = reqdata.hole_start;
758  tp.chunk_end = reqdata.hole_end;
759  PACKET_FILE_SIZE_TYPE byte_count = (reqdata.hole_end - reqdata.hole_start) + 1;
760 
761  uint32_t check=0;
762  // Anything in the queue yet?
763  if (!txq[node_id].outgoing.progress[tx_id].file_info.size())
764  {
765  // Add first entry
766  txq[node_id].outgoing.progress[tx_id].file_info.push_back(tp);
767  txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
768  }
769  else
770  {
771  // Check against existing data
772  for (uint32_t j=0; j<txq[node_id].outgoing.progress[tx_id].file_info.size(); ++j)
773  {
774  // If we match this entry
775  if (tp.chunk_start == txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start && tp.chunk_end == txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end)
776  {
777  break;
778  }
779  // If we start before this entry
780  if (tp.chunk_start < txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start)
781  {
782  // If we end before this entry (at least one byte between), insert
783  if (tp.chunk_end + 1 < txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start)
784  {
785  txq[node_id].outgoing.progress[tx_id].file_info.insert(txq[node_id].outgoing.progress[tx_id].file_info.begin()+j, tp);
786  txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
787  break;
788  }
789  // Otherwise, extend the near end
790  else
791  {
792  tp.chunk_end = txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
793  txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
794  byte_count = (tp.chunk_end - tp.chunk_start) + 1;
795  txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
796  break;
797  }
798  }
799  else
800  {
801  // If we overlap on the end, extend the far end
802  if (tp.chunk_start <= txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
803  {
804  if (tp.chunk_end > txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end)
805  {
806  byte_count = tp.chunk_end - txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end;
807  tp.chunk_start = txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
808  txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
809  txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
810  break;
811  }
812  }
813  }
814  check = j + 1;
815  }
816 
817 
818  // If we are higher than everything currently in the list, then append
819  if (check == txq[node_id].outgoing.progress[tx_id].file_info.size())
820  {
821  txq[node_id].outgoing.progress[tx_id].file_info.push_back(tp);
822  txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
823  }
824 
825  }
826 
827  // Save meta to disk
828  write_meta(txq[node_id].outgoing.progress[tx_id]);
829  txq[node_id].outgoing.sendqueue = false;
830  txq[node_id].outgoing.sentqueue = true;
831  txq[node_id].outgoing.progress[tx_id].senddata = true;
832  txq[node_id].outgoing.progress[tx_id].sentdata = false;
833  txq[node_id].outgoing.progress[tx_id].sendmeta = false;
834  txq[node_id].outgoing.progress[tx_id].sentmeta = true;
835  }
836 
837  outgoing_tx_lock.unlock();
838  break;
839  }
840  case PACKET_DATA:
841  {
842  packet_struct_data data;
843 
844  extract_data(recvbuf, data.node_id, data.tx_id, data.byte_count, data.chunk_start, data.chunk);
845 
847 
848  // create transaction entry if new, and then add data
849 
850  incoming_tx_lock.lock();
851 
852  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node_id].incoming, data.tx_id);
853 
854  // Update corresponding incoming queue entry if it exists
855  if (tx_id > 0)
856  {
857  txq[node_id].incoming.rcvddata = true;
858  txq[node_id].incoming.progress[tx_id].datatime = currentmjd();
859 
860  // tx_id now points to the valid entry to which we should add the data
861  file_progress tp;
862  tp.chunk_start = data.chunk_start;
863  tp.chunk_end = data.chunk_start + data.byte_count - 1;
864 
865  packet_struct_data odata;
866  odata = data;
867 
868  uint32_t check=0;
869  bool duplicate = false;
870  bool updated = false;
871 
872  // Do we have any data yet?
873  if (!txq[node_id].incoming.progress[tx_id].file_info.size())
874  {
875  // Add first entry, then write data
876  txq[node_id].incoming.progress[tx_id].file_info.push_back(tp);
877  txq[node_id].incoming.progress[tx_id].total_bytes += data.byte_count;
878  updated = true;
879  }
880  else
881  {
882  // Check against existing data
883  for (uint32_t j=0; j<txq[node_id].incoming.progress[tx_id].file_info.size(); ++j)
884  {
885  // Check for duplicate
886  if (tp.chunk_start >= txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start && tp.chunk_end <= txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end)
887  {
888  duplicate = true;
889  break;
890  }
891  // If we start before this entry
892  if (tp.chunk_start < txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start)
893  {
894  // If we end before this entry (at least one byte between), insert
895  if (tp.chunk_end + 1 < txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start)
896  {
897  txq[node_id].incoming.progress[tx_id].file_info.insert(txq[node_id].incoming.progress[tx_id].file_info.begin()+j, tp);
898  txq[node_id].incoming.progress[tx_id].total_bytes += data.byte_count;
899  updated = true;
900  break;
901  }
902  // Otherwise, extend the near end
903  else
904  {
905  tp.chunk_end = txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start - 1;
906  txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
907  data.byte_count = (tp.chunk_end - tp.chunk_start) + 1;
908  txq[node_id].incoming.progress[tx_id].total_bytes += data.byte_count;
909  updated = true;
910  break;
911  }
912  }
913  else
914  {
915  // If we overlap on the end, extend the far end
916  if (tp.chunk_start <= txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end + 1)
917  {
918  if (tp.chunk_end > txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end)
919  {
920  data.byte_count = tp.chunk_end - txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end;
921  tp.chunk_start = txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end + 1;
922  txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
923  txq[node_id].incoming.progress[tx_id].total_bytes += data.byte_count;
924  updated = true;
925  break;
926  }
927  }
928  }
929  check = j + 1;
930  }
931 
932 
933  // If we are higher than everything currently in the list, then append
934  if (!duplicate && check == txq[node_id].incoming.progress[tx_id].file_info.size())
935  {
936  txq[node_id].incoming.progress[tx_id].file_info.push_back(tp);
937  txq[node_id].incoming.progress[tx_id].total_bytes += data.byte_count;
938  updated = true;
939  }
940 
941  }
942 
943  // Write to disk if this is new data
944  if (updated)
945  {
946  // Write incoming data to disk
947  if (txq[node_id].incoming.progress[tx_id].fp == nullptr)
948  {
949  partial_filepath = txq[node_id].incoming.progress[tx_id].temppath + ".file";
950  if (data_exists(partial_filepath))
951  {
952  txq[node_id].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), "r+");
953  }
954  else
955  {
956  txq[node_id].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), "w");
957  }
958  }
959 
960  if (txq[node_id].incoming.progress[tx_id].fp == nullptr)
961  {
962  if (agent->debug_level)
963  {
964  debug_fd_lock.lock();
965  fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming: File Error: %s %s on ID: %u Chunk: %u\n", tet.split(), dt.lap(), partial_filepath.c_str(), cosmos_error_string(-errno).c_str(), tx_id, tp.chunk_start);
966  fflush(agent->get_debug_fd());
967  debug_fd_lock.unlock();
968  }
969  }
970  else
971  {
972  fseek(txq[node_id].incoming.progress[tx_id].fp, tp.chunk_start, SEEK_SET);
973  fwrite(data.chunk, data.byte_count, 1, txq[node_id].incoming.progress[tx_id].fp);
974  fflush(txq[node_id].incoming.progress[tx_id].fp);
975  // Write latest meta data to disk
976  write_meta(txq[node_id].incoming.progress[tx_id]);
977  if (agent->debug_level)
978  {
979  uint32_t total = 0;
980  for (uint16_t i=0; i<data.byte_count; ++i)
981  {
982  total += data.chunk[i];
983  }
984  }
985  }
986 
987  }
988 
989  // Check if file has been completely received
990 // if(txq[node_id].incoming.progress[tx_id].file_size == txq[node_id].incoming.progress[tx_id].total_bytes && txq[node_id].incoming.progress[tx_id].havemeta)
991 // {
992 // // See if we know what the remote node_id is for this
993 // if (txq[node_id].node_id > 0)
994 // {
995 // tx_progress tx_in = txq[node_id].incoming.progress[tx_id];
996 // if (agent->debug_level)
997 // {
998 // debug_fd_lock.lock();
999 // fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming: Complete: %u %s %u %u\n", tet.split(), dt.lap(), tx_in.tx_id, tx_in.node_name.c_str(), tx_in.file_size, tx_in.total_bytes);
1000 // fflush(agent->get_debug_fd());
1001 // debug_fd_lock.unlock();
1002 // }
1003 
1004 // // Move file to its final location
1005 // if (!txq[node_id].incoming.progress[tx_id].complete)
1006 // {
1007 // if (txq[node_id].incoming.progress[tx_id].fp != nullptr)
1008 // {
1009 // fclose(txq[node_id].incoming.progress[tx_id].fp);
1010 // txq[node_id].incoming.progress[tx_id].fp = nullptr;
1011 // }
1012 // string final_filepath = tx_in.temppath + ".file";
1013 // int iret = rename(final_filepath.c_str(), tx_in.filepath.c_str());
1014 // // Make sure metadata is recorded
1015 // write_meta(txq[node_id].incoming.progress[tx_id], 0.);
1016 // if (agent->debug_level)
1017 // {
1018 // debug_fd_lock.lock();
1019 // fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming: Renamed/Data: %d %s\n", tet.split(), dt.lap(), iret, tx_in.filepath.c_str());
1020 // fflush(agent->get_debug_fd());
1021 // debug_fd_lock.unlock();
1022 // }
1023 
1024 // // Mark complete
1025 // txq[node_id].incoming.progress[tx_id].complete = true;
1026 // txq[node_id].incoming.progress[tx_id].senddata = false;
1027 // txq[node_id].incoming.progress[tx_id].sentdata = true;
1028 // }
1029 // }
1030 // }
1031  }
1032  else
1033  {
1034  txq[node_id].incoming.progress[data.tx_id].sendmeta = true;
1035  }
1036 
1037  incoming_tx_lock.unlock();
1038 
1039  break;
1040  }
1041  case PACKET_COMPLETE:
1042  {
1043  packet_struct_complete complete;
1044 
1045  extract_complete(recvbuf, complete);
1046 
1047  outgoing_tx_lock.lock();
1048 
1049  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node_id].outgoing, complete.tx_id);
1050 
1051  if (tx_id > 0)
1052  {
1053  txq[node_id].outgoing.progress[tx_id].complete = true;
1054  txq[node_id].outgoing.progress[tx_id].senddata = false;
1055  txq[node_id].outgoing.progress[tx_id].sendmeta = false;
1056  txq[node_id].outgoing.sendqueue = true;
1057  txq[node_id].outgoing.sentqueue = false;
1058  }
1059 
1060  outgoing_tx_lock.unlock();
1061  break;
1062  }
1063  case PACKET_CANCEL:
1064  {
1065  profile_check(__LINE__, 1);
1066 
1067  packet_struct_complete cancel;
1068 
1069  extract_complete(recvbuf, cancel);
1070 
1071  incoming_tx_lock.lock();
1072 
1073  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node_id].incoming, cancel.tx_id);
1074 
1075  if (tx_id > 0)
1076  {
1077  // Remove the transaction
1078  incoming_tx_del(node_id, tx_id);
1079  }
1080 
1081  incoming_tx_lock.unlock();
1082  break;
1083  }
1084  default:
1085  ++type_error_count;
1086  break;
1087  }
1088  profile_check(__LINE__, 1);
1089 
1090  }
1091  }
1092 
1093  // Flush any active metadata
1094  for (uint16_t node=0; node<txq.size(); ++node)
1095  {
1096  for (uint16_t tx_id=1; tx_id<PROGRESS_QUEUE_SIZE; ++tx_id)
1097  {
1098  if (txq[(node)].incoming.progress[tx_id].tx_id && txq[(node)].incoming.progress[tx_id].havemeta)
1099  {
1100  write_meta(txq[(node)].incoming.progress[tx_id], 0.);
1101  }
1102  }
1103  }
1104 
1105 }
string node
Definition: agent_file2.cpp:100
PACKET_BYTE length
Definition: transferlib.h:174
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file4.cpp:2811
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
Definition: transferlib.h:205
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file4.cpp:3246
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file4.cpp:1843
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file4.cpp:2930
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
Definition: transferlib.h:291
double limjd
Definition: agent_file3.cpp:103
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file4.cpp:3000
string nodeName
Definition: agentclass.h:367
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
Definition: transferlib.h:332
FILE * data_open(string path, const char *mode)
Open file from path.
Definition: datalib.cpp:1019
string chanip
Definition: agent_file2.cpp:102
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
Definition: transferlib.h:155
Definition: transferlib.h:245
int32_t myrecvfrom(string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
Definition: agent_file4.cpp:1555
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
static Agent * agent
Definition: agent_file4.cpp:94
char address[17]
Definition: socketlib.h:134
static std::mutex incoming_tx_lock
Definition: agent_file4.cpp:141
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
ElapsedTime dt
Definition: agent_file4.cpp:186
PACKET_BYTE bytes[225-2]
Definition: transferlib.h:187
Definition: agent_file2.cpp:98
Definition: transferlib.h:217
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
PACKET_BYTE length
Definition: transferlib.h:186
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
string lookup_node_id_name(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:536
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_message(vector< PACKET_BYTE > &packet, packet_struct_message &message)
Definition: transferlib.cpp:341
PACKET_BYTE bytes[225-2]
Definition: transferlib.h:175
Definition: transferlib.h:183
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
uint8_t message[300]
Definition: kpc9612p_send.cpp:36
double lomjd
Definition: agent_file3.cpp:104
Definition: socketlib.h:115
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
struct sockaddr_in caddr
Definition: socketlib.h:122
Definition: transferlib.h:195
Definition: transferlib.h:261
void extract_command(vector< PACKET_BYTE > &packet, packet_struct_command &command)
Definition: transferlib.cpp:364
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string command
Definition: add_radio.cpp:27
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
void extract_heartbeat(vector< PACKET_BYTE > &packet, packet_struct_heartbeat &heartbeat)
Definition: transferlib.cpp:316
static const unsigned char PACKET_MESSAGE
Definition: transferlib.h:112
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
#define PACKET_HEADER_OFFSET_NODE_NAME
Definition: transferlib.h:153
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define SEEK_SET
Definition: zconf.h:475
static uint32_t type_error_count
Definition: agent_file4.cpp:153
Definition: transferlib.h:171
static const unsigned char PACKET_COMMAND
Definition: transferlib.h:113
static std::mutex outgoing_tx_lock
Definition: agent_file4.cpp:142
Definition: transferlib.h:275
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:221
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
socket_channel chansock
Definition: agent_file2.cpp:101
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
FILE * fp
Definition: rw_test.cpp:38
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
static double last_data_receive_time
Definition: agent_file4.cpp:145
double nmjd
Definition: agent_file2.cpp:105
static const unsigned char PACKET_HEARTBEAT
Definition: transferlib.h:111
static string node
Definition: agent_monitor.cpp:126
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:173
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
double fmjd
Definition: agent_file3.cpp:106
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
void profile_check(int32_t line_number, uint16_t thread=0)
Definition: agent_file4.cpp:3629
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
static const unsigned char PACKET_REQQUEUE
Definition: transferlib.h:110
#define PACKET_HEADER_OFFSET_NODE_ID
Definition: transferlib.h:152
bool data_exists(string &path)
Check existence of path.
Definition: datalib.cpp:1003
void extract_reqqueue(vector< PACKET_BYTE > &packet, packet_struct_reqqueue &reqqueue)
Definition: transferlib.cpp:271
void transmit_loop ( )
noexcept
1416 {
1417  int32_t iretn;
1418  // std::mutex transmit_queue_lock;
1419  // std::unique_lock<std::mutex> locker(transmit_queue_lock);
1420 
1421  while (agent->running())
1422  {
1423  if (agent->running() == (uint16_t)Agent::State::IDLE)
1424  {
1425  COSMOS_SLEEP(1);
1426  continue;
1427  }
1428 
1429 
1430  // transmit_queue_check.wait(locker);
1431 
1432  while (!transmit_queue.empty())
1433  {
1434  // Get next packet from transceiver FIFO
1435  txqueue_lock.lock();
1436  transmit_queue_entry entry = transmit_queue.front();
1437  iretn = mysendto(entry.type, entry.channel, entry.packet);
1438  if (iretn >= 0)
1439  {
1440  out_comm_channel[entry.channel].fmjd = transmit_queue.back().nmjd;
1441  transmit_queue.pop();
1442  }
1443  txqueue_lock.unlock();
1444  }
1445  COSMOS_SLEEP(.001);
1446  }
1447 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
static std::mutex txqueue_lock
Definition: agent_file4.cpp:140
int iretn
Definition: rw_test.cpp:37
static Agent * agent
Definition: agent_file4.cpp:94
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
int32_t mysendto(string type, int32_t use_channel, vector< PACKET_BYTE > &buf)
Definition: agent_file4.cpp:1516
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file4.cpp:128
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
Definition: agent_file.cpp:110
void profile_check ( int32_t  line_number,
uint16_t  thread = 0 
)
3630 {
3631  return;
3632  static FILE *fp = nullptr;
3633 // static mutex profile_lock;
3634 
3635  debug_fd_lock.lock();
3636 
3637  if (fp == nullptr)
3638  {
3639 // fp = fopen("profile_check.out", "a");
3640  fp = agent->get_debug_fd();
3641  }
3642 
3643  if (fp != nullptr)
3644  {
3645  fprintf(fp, "%.3f %u %d\n", tet.split(), thread, line_number);
3646  fflush(fp);
3647  }
3648 
3649  debug_fd_lock.unlock();
3650 
3651 }
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static Agent * agent
Definition: agent_file4.cpp:94
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
FILE * fp
Definition: rw_test.cpp:38
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
int32_t request_send_command ( string &  request,
string &  response,
Agent  
)
3354 {
3355  size_t f1 = request.find(' ');
3356  size_t f2 = request.find(' ', f1 + 1);
3357  string nodename = request.substr(f1+1, (f2-f1)-1);
3358  string tcommand = request.substr(f2+1);
3359  for (uint16_t i=0; i<tcommand.length(); ++i)
3360  {
3361  if (tcommand[i] == 0)
3362  {
3363  tcommand.resize(i);
3364  break;
3365  }
3366  }
3367  string command;
3368  if (tcommand[0] == '{')
3369  {
3370  command = tcommand;
3371  }
3372  else
3373  {
3374  JSONObject jobject("event_name", data_name(nodename, currentmjd(), "file_command", ""));
3375  jobject.addElement("event_utc", 0.);
3376  jobject.addElement("event_type", EVENT_TYPE_COMMAND);
3377  jobject.addElement("event_flag", 0);
3378  jobject.addElement("event_data", tcommand);
3379  command = jobject.to_json_string();
3380  }
3381 
3382  int32_t node_id = lookup_node_id(nodename);
3383  if (node_id > 0)
3384  {
3385  int32_t use_channel;
3386  if ((use_channel=check_channel(node_id)) > 0)
3387  {
3388  vector<PACKET_BYTE> packet;
3389  make_command_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), command);
3390  queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Outgoing", packet);
3391  response = command;
3392  }
3393  else
3394  {
3395  return use_channel;
3396  }
3397  }
3398  else
3399  {
3400  return node_id;
3401  }
3402 
3403 
3404  return 0;
3405 }
#define EVENT_TYPE_COMMAND
Definition: cosmos-defs.h:215
void make_command_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string command)
Definition: transferlib.cpp:348
int i
Definition: rw_test.cpp:37
string data_name(string node, double mjd, string extra, string type)
Create data file name.
Definition: datalib.cpp:685
int32_t lookup_node_id(string node_name)
Definition: transferlib.cpp:514
static uint16_t use_channel
Definition: agent_file.cpp:97
int32_t queuesendto(PACKET_NODE_ID_TYPE node_id, string type, vector< PACKET_BYTE > packet)
Definition: agent_file4.cpp:1473
string command
Definition: add_radio.cpp:27
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
Definition: jsonobject.h:5
string nodename
Definition: agent_add_soh.cpp:54
int32_t check_channel(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file4.cpp:3258
int32_t request_send_message ( string &  request,
string &  response,
Agent  
)
3409 {
3410  size_t f1 = request.find(' ');
3411  size_t f2 = request.find(' ', f1 + 1);
3412  string nodename = request.substr(f1+1, (f2-f1)-1);
3413  string message = request.substr(f2+1);
3414  for (uint16_t i=0; i<message.length(); ++i)
3415  {
3416  if (message[i] == 0)
3417  {
3418  message.resize(i);
3419  break;
3420  }
3421  }
3422  int32_t node_id = lookup_node_id(nodename);
3423  if (node_id > 0)
3424  {
3425  int32_t use_channel;
3426  if ((use_channel=check_channel(node_id)) > 0)
3427  {
3428  vector<PACKET_BYTE> packet;
3429  make_message_packet(packet, static_cast <PACKET_NODE_ID_TYPE>(node_id), message);
3430  queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node_id), "Outgoing", packet);
3431  }
3432  else
3433  {
3434  return use_channel;
3435  }
3436  }
3437  else
3438  {
3439  return node_id;
3440  }
3441 
3442 
3443  return 0;
3444 }
int i
Definition: rw_test.cpp:37
int32_t lookup_node_id(string node_name)
Definition: transferlib.cpp:514
static uint16_t use_channel
Definition: agent_file.cpp:97
int32_t queuesendto(PACKET_NODE_ID_TYPE node_id, string type, vector< PACKET_BYTE > packet)
Definition: agent_file4.cpp:1473
uint8_t message[300]
Definition: kpc9612p_send.cpp:36
void make_message_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string message)
Definition: transferlib.cpp:325
string nodename
Definition: agent_add_soh.cpp:54
int32_t check_channel(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file4.cpp:3258
int32_t request_debug ( string &  request,
string &  ,
Agent  
)
3447 {
3448 
3449  string requestString = string(request);
3450  StringParser sp(requestString, ' ');
3451 
3452  agent->debug_level = sp.getFieldNumberAsDouble(2); // should be getFieldNumberAsBoolean
3453 
3454  std::cout << "agent->debug_level: " << agent->debug_level << std::endl;
3455  return 0;
3456 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
static Agent * agent
Definition: agent_file4.cpp:94
Definition: stringlib.h:89
int32_t request_get_channels ( string &  request,
string &  response,
Agent agent 
)
2213 {
2214  response.clear();
2215  for (uint16_t channel=0; channel<out_comm_channel.size(); ++channel)
2216  {
2217  response += "{";
2218  response += to_json("channel", channel);
2219  response += to_json("node", out_comm_channel[channel].node);
2220  response += to_json("ip", out_comm_channel[channel].chanip);
2221  response += to_json("size", out_comm_channel[channel].packet_size);
2222  response += to_json("throughput", out_comm_channel[channel].throughput);
2223  response += to_json("nmjd", out_comm_channel[channel].nmjd);
2224  response += to_json("limjd", out_comm_channel[channel].limjd);
2225  response += to_json("lomjd", out_comm_channel[channel].lomjd);
2226  response += to_json("fmjd", out_comm_channel[channel].fmjd);
2227  response += "}";
2228  }
2229  return 0;
2230 }
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
static string node
Definition: agent_monitor.cpp:126
string to_json(string key, string value)
Definition: stringlib.cpp:334
int32_t request_set_throughput ( string &  request,
string &  response,
Agent agent 
)
2233 {
2234  uint16_t channel=0;
2235  uint32_t throughput=0;
2236 
2237  sscanf(request.c_str(), "%*s %hu %u\n", &channel, &throughput);
2238  if (channel < out_comm_channel.size())
2239  {
2240  if (throughput)
2241  {
2242  out_comm_channel[channel].throughput = throughput;
2243  }
2244  }
2245  else
2246  {
2247  response = "Channel " + to_unsigned(channel) + " too large";
2248  }
2249  return 0;
2250 
2251 }
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
int32_t request_remove_file ( string &  request,
string &  response,
Agent agent 
)
2254 {
2255  char type;
2256  uint32_t tx_id;
2257 
2258  sscanf(request.c_str(), "%*s %c %u\n", &type, &tx_id);
2259  switch (type)
2260  {
2261  case 'i':
2262  {
2263  break;
2264  }
2265  case 'o':
2266  {
2267  break;
2268  }
2269  }
2270 
2271  return 0;
2272 }
int32_t request_ls ( string &  request,
string &  response,
Agent agent 
)
2124 {
2125 
2126  //the request string == "ls directoryname"
2127  //get the directory name
2128  // char directoryname[COSMOS_MAX_NAME+1];
2129  // memmove(directoryname, request.substr(3), COSMOS_MAX_NAME);
2130  string directoryname = request.substr(3);
2131 
2132  DIR* dir;
2133  struct dirent* ent;
2134 
2135  string all_file_names;
2136 
2137  if((dir = opendir(directoryname.c_str())) != nullptr)
2138  {
2139  while (( ent = readdir(dir)) != nullptr)
2140  {
2141  all_file_names += ent->d_name;
2142  all_file_names += "\n";
2143  }
2144  closedir(dir);
2145 
2146  (response = all_file_names.c_str());
2147  }
2148  else
2149  response = "unable to open directory " + directoryname;
2150  return 0;
2151 }
int closedir(DIR *dir)
Definition: dirent.c:74
Definition: dirent.c:24
Definition: dirent.h:21
DIR * opendir(const char *name)
Definition: dirent.c:32
struct dirent * readdir(DIR *dir)
Definition: dirent.c:97
char * d_name
Definition: dirent.h:23
int32_t request_list_incoming ( string &  request,
string &  response,
Agent agent 
)
2154 {
2155  int32_t iretn;
2156  response.clear();
2157  for (uint16_t node = 0; node<txq.size(); ++node)
2158  {
2159  if ((iretn=check_node_id(node)) > 0)
2160  {
2161  response += std::to_string(node) + ' ' + txq[(node)].node_name + ' ' + std::to_string(txq[(node)].incoming.size) + std::to_string(iretn)+ " rcvdqueue: " + std::to_string(txq[(node)].incoming.rcvdmeta) + " rcvdmeta: " + std::to_string(txq[(node)].incoming.rcvdmeta) + " rcvddata: " + std::to_string(txq[(node)].incoming.rcvddata) + "\n";
2162  for(tx_progress tx : txq[(node)].incoming.progress)
2163  {
2164  if (tx.tx_id)
2165  {
2166  response += to_label("tx_id", tx.tx_id) + ' ';
2167  response += to_label("node", tx.node_name) + ' ';
2168  response += to_label("agent", tx.agent_name) + ' ';
2169  response += to_label("name", tx.file_name) + ' ';
2170  response += to_label("bytes", tx.total_bytes) + ' ';
2171  response += "/" + to_unsigned(tx.file_size) + ' ';
2172  response += to_label("havemeta", tx.havemeta) + ' ';
2173  response += to_label("sendmeta", tx.sendmeta) + ' ';
2174  response += to_label("sentmeta", tx.sentmeta) + ' ';
2175  response += to_label("senddata", tx.senddata) + ' ';
2176  response += to_label("sentdata", tx.sentdata) + ' ';
2177  response += to_label("complete", tx.complete);
2178  response += "\n";
2179  }
2180  }
2181  }
2182  }
2183 
2184  return 0;
2185 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
int iretn
Definition: rw_test.cpp:37
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
string node_name
Definition: agent_001.cpp:46
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
static string node
Definition: agent_monitor.cpp:126
string to_label(string label, double value, uint16_t precision, bool mjd)
Definition: stringlib.cpp:376
int32_t request_list_outgoing ( string &  request,
string &  response,
Agent agent 
)
2188 {
2189  int32_t iretn;
2190  response.clear();
2191  for (uint16_t node=0; node<txq.size(); ++node)
2192  {
2193  if ((iretn=check_node_id(node)) > 0)
2194  {
2195  response += std::to_string(node)+ ' ' + txq[(node)].node_name + ' ' + std::to_string(txq[(node)].outgoing.size) + "\n";
2196  for(tx_progress tx : txq[(node)].outgoing.progress)
2197  {
2198  if (tx.tx_id)
2199  {
2200  response += "tx_id: " + std::to_string(tx.tx_id) + " node: " + tx.node_name + " agent: " + tx.agent_name + " name: " + tx.file_name + " bytes: " + std::to_string(tx.total_bytes) + "/" + std::to_string(tx.file_size);
2201  response += " enabled: " + to_bool(tx.enabled);
2202  response += " havemeta: " + to_bool(tx.havemeta) + " sendmeta: " + to_bool(tx.sendmeta) + " sentmeta: " + to_bool(tx.sentmeta);
2203  response += " senddata: " + to_bool(tx.senddata) + " sentdata: " + to_bool(tx.sentdata) + " complete: " + to_bool(tx.complete) + "\n";
2204  }
2205  }
2206  }
2207  }
2208 
2209  return 0;
2210 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
int iretn
Definition: rw_test.cpp:37
string node_name
Definition: agent_001.cpp:46
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
string to_bool(bool value)
Definition: stringlib.cpp:326
static string node
Definition: agent_monitor.cpp:126
int32_t request_list_incoming_json ( string &  request,
string &  response,
Agent agent 
)
3486 {
3487  (response = json_list_incoming());
3488  return 0;
3489 }
string json_list_incoming()
Definition: agent_file4.cpp:3497
int32_t request_list_outgoing_json ( string &  request,
string &  response,
Agent agent 
)
3492 {
3493  (response = json_list_outgoing().c_str());
3494  return 0;
3495 }
string json_list_outgoing()
Definition: agent_file4.cpp:3532
int32_t request_set_logstride ( string &  request,
string &  response,
Agent agent 
)
3459 {
3460  double new_logstride;
3461  sscanf(request.c_str(),"set_logstride %lf",&new_logstride);
3462  if(new_logstride > 0. )
3463  {
3464  logstride_sec = new_logstride;
3465  }
3466  return 0;
3467 }
double logstride_sec
Definition: agent_file4.cpp:185
int32_t request_get_logstride ( string &  ,
string &  response,
Agent  
)
3470 {
3471  response = "{" + to_json("logstride", logstride_sec) + "}";
3472  return 0;
3473 }
double logstride_sec
Definition: agent_file4.cpp:185
string to_json(string key, string value)
Definition: stringlib.cpp:334
int32_t outgoing_tx_add ( tx_progress tx_out)
2275 {
2276  if (agent->debug_level)
2277  {
2278  debug_fd_lock.lock();
2279  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: outgoing_tx_add: ", tet.split(), dt.lap());
2280  fflush(agent->get_debug_fd());
2281  debug_fd_lock.unlock();
2282  }
2283 
2284  int32_t node_id = lookup_node_id(tx_out.node_name);
2285  if (node_id <= 0)
2286  {
2287  if (agent->debug_level)
2288  {
2289  debug_fd_lock.lock();
2290  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_NODE\n");
2291  fflush(agent->get_debug_fd());
2292  debug_fd_lock.unlock();
2293  }
2294  if (node_id == 0)
2295  {
2296  return TRANSFER_ERROR_NODE;
2297  }
2298  else
2299  {
2300  return node_id;
2301  }
2302  }
2303 
2304  // Check for duplicate tx_id
2305  if (txq[(node_id)].outgoing.progress[tx_out.tx_id].tx_id)
2306  {
2307  return TRANSFER_ERROR_DUPLICATE;
2308  }
2309  // Only add if we have room
2310 // if (txq[(node_id)].outgoing.size == PROGRESS_QUEUE_SIZE)
2311 // {
2312 // if (agent->debug_level)
2313 // {
2314 // debug_fd_lock.lock();
2315 // fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_QUEUEFULL\n");
2316 // fflush(agent->get_debug_fd());
2317 // debug_fd_lock.unlock();
2318 // }
2319 // return TRANSFER_ERROR_QUEUEFULL;
2320 // }
2321 
2322  // tx_out.state = STATE_QUEUE;
2323  if (tx_out.file_name.size())
2324  {
2325  tx_out.filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
2326  }
2327  else
2328  {
2329  if (agent->debug_level)
2330  {
2331  debug_fd_lock.lock();
2332  fprintf(agent->get_debug_fd(), "TRANSFER_ERROR_FILENAME\n");
2333  fflush(agent->get_debug_fd());
2334  debug_fd_lock.unlock();
2335  }
2336  tx_out.filepath = "";
2337  return TRANSFER_ERROR_FILENAME;
2338  }
2339 
2340  tx_out.temppath = data_base_path(tx_out.node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
2341 
2342  // Check for a duplicate file name of something already in queue
2343  uint16_t minindex = 255 - static_cast<uint16_t>(pow(pow(TRANSFER_QUEUE_LIMIT,1.f/3.f), (3.f - log10f(tx_out.file_size) / 2.f)));
2344  for (uint16_t i=minindex; i<256; ++i)
2345  {
2346  if (!txq[(node_id)].outgoing.progress[i].filepath.empty() && tx_out.filepath == txq[(node_id)].outgoing.progress[i].filepath)
2347  {
2348  // Remove the META file
2349  if (agent->debug_level)
2350  {
2351  debug_fd_lock.lock();
2352  fprintf(agent->get_debug_fd(), "%u %s %s %s TRANSFER_ERROR_DUPLICATE\n", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.filepath.c_str());
2353  fflush(agent->get_debug_fd());
2354  debug_fd_lock.unlock();
2355  }
2356  string filepath = tx_out.temppath + ".meta";
2357  remove(filepath.c_str());
2358  return TRANSFER_ERROR_DUPLICATE;
2359  }
2360  }
2361 
2362  tx_out.fp = nullptr;
2363  //get the file size
2364  tx_out.file_size = get_file_size(tx_out.filepath);
2365  tx_out.savetime = 0.;
2366 
2367  // save and queue metadata packet
2368  tx_out.havemeta = true;
2369  tx_out.sendmeta = true;
2370  tx_out.sentmeta = false;
2371  tx_out.senddata = false;
2372  tx_out.sentdata = false;
2373  tx_out.complete = false;
2374 
2375  if (agent->debug_level)
2376  {
2377  debug_fd_lock.lock();
2378  fprintf(agent->get_debug_fd(), "%u %s %s %s %lu ", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.filepath.c_str(), PROGRESS_QUEUE_SIZE);
2379  fflush(agent->get_debug_fd());
2380  debug_fd_lock.unlock();
2381  }
2382 
2383  // Good to go. Add it to queue.
2384  outgoing_tx_lock.lock();
2385  // txq[(node_id)].outgoing.progress[tx_out.tx_id] = tx_out;
2386 
2387  txq[(node_id)].outgoing.progress[tx_out.tx_id].tx_id = tx_out.tx_id;
2388  txq[(node_id)].outgoing.progress[tx_out.tx_id].havemeta = tx_out.havemeta;
2389  txq[(node_id)].outgoing.progress[tx_out.tx_id].sendmeta = tx_out.sendmeta;
2390  txq[(node_id)].outgoing.progress[tx_out.tx_id].sentmeta = tx_out.sentmeta;
2391  txq[(node_id)].outgoing.progress[tx_out.tx_id].senddata = tx_out.senddata;
2392  txq[(node_id)].outgoing.progress[tx_out.tx_id].sentdata = tx_out.sentdata;
2393  txq[(node_id)].outgoing.progress[tx_out.tx_id].complete = tx_out.complete;
2394  txq[(node_id)].outgoing.progress[tx_out.tx_id].node_name = tx_out.node_name;
2395  txq[(node_id)].outgoing.progress[tx_out.tx_id].agent_name = tx_out.agent_name;
2396  txq[(node_id)].outgoing.progress[tx_out.tx_id].file_name = tx_out.file_name;
2397  txq[(node_id)].outgoing.progress[tx_out.tx_id].filepath = tx_out.filepath;
2398  txq[(node_id)].outgoing.progress[tx_out.tx_id].temppath = tx_out.temppath;
2399  txq[(node_id)].outgoing.progress[tx_out.tx_id].savetime = tx_out.savetime;
2400  txq[(node_id)].outgoing.progress[tx_out.tx_id].datatime = tx_out.datatime;
2401  txq[(node_id)].outgoing.progress[tx_out.tx_id].file_size = tx_out.file_size;
2402  txq[(node_id)].outgoing.progress[tx_out.tx_id].total_bytes = tx_out.total_bytes;
2403  txq[(node_id)].outgoing.progress[tx_out.tx_id].file_info.clear();
2404  for (file_progress filep : tx_out.file_info)
2405  {
2406  txq[(node_id)].outgoing.progress[tx_out.tx_id].file_info.push_back(filep);
2407  }
2408  txq[(node_id)].outgoing.progress[tx_out.tx_id].fp = tx_out.fp;
2409  ++txq[(node_id)].outgoing.size;
2410  outgoing_tx_lock.unlock();
2411 
2412  if (agent->debug_level)
2413  {
2414  debug_fd_lock.lock();
2415  fprintf(agent->get_debug_fd(), " %u\n", txq[(node_id)].outgoing.size);
2416  fflush(agent->get_debug_fd());
2417  debug_fd_lock.unlock();
2418  }
2419 
2420  return outgoing_tx_recount(node_id);
2421 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
bool sentdata
Definition: transferlib.h:347
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
int32_t lookup_node_id(string node_name)
Definition: transferlib.cpp:514
static Agent * agent
Definition: agent_file4.cpp:94
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
ElapsedTime dt
Definition: agent_file4.cpp:186
double datatime
Definition: transferlib.h:355
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
#define TRANSFER_ERROR_DUPLICATE
Definition: cosmos-errno.h:225
string agent_name
Definition: transferlib.h:350
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
int32_t outgoing_tx_recount(uint8_t node)
Definition: agent_file4.cpp:2685
static std::mutex outgoing_tx_lock
Definition: agent_file4.cpp:142
string file_name
Definition: transferlib.h:351
#define TRANSFER_ERROR_FILENAME
Definition: cosmos-errno.h:224
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
bool sendmeta
Definition: transferlib.h:344
int32_t outgoing_tx_add ( string  node_name,
string  agent_name,
string  file_name 
)
2424 {
2425  if (node_name.empty() || agent_name.empty() || file_name.empty())
2426  {
2427  if (agent->debug_level)
2428  {
2429  debug_fd_lock.lock();
2430  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: outgoing_tx_add: TRANSFER_ERROR_FILENAME\n", tet.split(), dt.lap());
2431  fflush(agent->get_debug_fd());
2432  debug_fd_lock.unlock();
2433  }
2434  return TRANSFER_ERROR_FILENAME;
2435  }
2436 
2437  // BEGIN GATHERING THE METADATA
2438  tx_progress tx_out;
2439 
2440  uint8_t node_id = lookup_node_id(node_name);
2441  if (node_id == 0)
2442  {
2443  return TRANSFER_ERROR_NODE;
2444  }
2445 
2446  // Only add if we have room
2447  if (txq[(node_id)].outgoing.size == PROGRESS_QUEUE_SIZE)
2448  {
2449  return TRANSFER_ERROR_QUEUEFULL;
2450  }
2451 
2452  // Locate next empty space
2453  //get the file size
2454  outgoing_tx_lock.lock();
2455  string filepath = data_base_path(node_name, "outgoing", agent_name, file_name);
2456  int32_t file_size = get_file_size(filepath);
2457 
2458  // Go through existing queue
2459  // - if it is already there and the size is different, remove it
2460  // - if it is already there, and the size is the same, set enable it
2461  // - if it is not already there and there is room to add is, go on to next step
2462  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2463  {
2464  if (txq[(node_id)].outgoing.progress[i].filepath == filepath)
2465  {
2466  if (txq[(node_id)].outgoing.progress[i].file_size == file_size)
2467  {
2468  txq[(node_id)].outgoing.progress[i].enabled = true;
2469  if (agent->debug_level)
2470  {
2471  debug_fd_lock.lock();
2472  fprintf(agent->get_debug_fd(), ".4f %.4f Main: outgoing_tx_add: Enable %u %s %s %s %lu ", tet.split(), dt.lap(), txq[(node_id)].outgoing.progress[i].tx_id, txq[(node_id)].outgoing.progress[i].node_name.c_str(), txq[(node_id)].outgoing.progress[i].agent_name.c_str(), txq[(node_id)].outgoing.progress[i].filepath.c_str(), PROGRESS_QUEUE_SIZE);
2473  fflush(agent->get_debug_fd());
2474  debug_fd_lock.unlock();
2475  }
2476  return node_id;
2477  }
2478  else
2479  {
2480  outgoing_tx_del(node_id, i, false);
2481  return TRANSFER_ERROR_FILESIZE;
2482  }
2483  }
2484  }
2485 
2486  uint16_t minindex = TRANSFER_QUEUE_LIMIT - static_cast<uint16_t>(pow(pow(TRANSFER_QUEUE_LIMIT,1.f/3.f), (3.f - log10f(file_size) / 2.f)));
2487  tx_out.tx_id = 0;
2488  for (uint16_t id=minindex; id<PROGRESS_QUEUE_SIZE; ++id)
2489  {
2490  if (txq[(node_id)].outgoing.progress[id].tx_id == 0)
2491  {
2492  tx_out.tx_id = id;
2493  break;
2494  }
2495  }
2496  outgoing_tx_lock.unlock();
2497 
2498  if (tx_out.tx_id > 0)
2499  {
2500  tx_out.havemeta = true;
2501  tx_out.sendmeta = true;
2502  tx_out.sentmeta = false;
2503  tx_out.senddata = false;
2504  tx_out.sentdata = false;
2505  tx_out.complete = false;
2506  tx_out.total_bytes = 0;
2507  tx_out.node_name = node_name;
2508  tx_out.agent_name = agent_name;
2509  tx_out.file_name = file_name;
2510  tx_out.temppath = data_base_path(tx_out.node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
2511  tx_out.filepath = filepath;
2512 // tx_out.filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
2513  tx_out.savetime = 0.;
2514 
2515  std::ifstream filename;
2516 
2517  //get the file size
2518 // tx_out.file_size = get_file_size(tx_out.filepath);
2519  tx_out.file_size = file_size;
2520 
2521  if(tx_out.file_size < 0)
2522  {
2523  if (agent->debug_level)
2524  {
2525  debug_fd_lock.lock();
2526  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: outgoing_tx_add: DATA_ERROR_SIZE_MISMATCH\n", tet.split(), dt.lap());
2527  fflush(agent->get_debug_fd());
2528  debug_fd_lock.unlock();
2529  }
2530  return DATA_ERROR_SIZE_MISMATCH;
2531  }
2532 
2533  // see if file can be opened
2534  filename.open(tx_out.filepath, std::ios::in|std::ios::binary);
2535  if(!filename.is_open())
2536  {
2537  if (agent->debug_level)
2538  {
2539  debug_fd_lock.lock();
2540  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: outgoing_tx_add: %s\n", tet.split(), dt.lap(), cosmos_error_string(-errno).c_str());
2541  fflush(agent->get_debug_fd());
2542  debug_fd_lock.unlock();
2543  }
2544  return -errno;
2545  }
2546  filename.close();
2547 
2548  file_progress tp;
2549  tp.chunk_start = 0;
2550  tp.chunk_end = tx_out.file_size - 1;
2551  tx_out.file_info.push_back(tp);
2552 
2553  write_meta(tx_out);
2554 
2555  int32_t iretn = outgoing_tx_add(tx_out);
2556  return iretn;
2557  }
2558  else
2559  {
2560  return TRANSFER_ERROR_MATCH;
2561  }
2562 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file4.cpp:1843
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
bool sentdata
Definition: transferlib.h:347
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
Definition: transferlib.h:338
bool havemeta
Definition: transferlib.h:342
string to_string(char *value)
Definition: stringlib.cpp:220
int32_t lookup_node_id(string node_name)
Definition: transferlib.cpp:514
int iretn
Definition: rw_test.cpp:37
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256, bool remove_file=true)
Definition: agent_file4.cpp:2564
static Agent * agent
Definition: agent_file4.cpp:94
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
ElapsedTime dt
Definition: agent_file4.cpp:186
#define TRANSFER_ERROR_FILESIZE
Definition: cosmos-errno.h:226
string node_name
Definition: agent_001.cpp:46
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file4.cpp:2274
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define TRANSFER_ERROR_QUEUEFULL
Definition: cosmos-errno.h:221
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
static std::mutex outgoing_tx_lock
Definition: agent_file4.cpp:142
string agent_name
Definition: agent_001.cpp:47
string file_name
Definition: transferlib.h:351
#define TRANSFER_ERROR_FILENAME
Definition: cosmos-errno.h:224
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
bool sendmeta
Definition: transferlib.h:344
int32_t outgoing_tx_del ( uint8_t  node,
uint16_t  tx_id = 256,
bool  remove_file = true 
)
2565 {
2566  if (node_id == 0 || node_id >= txq.size())
2567  {
2568  return TRANSFER_ERROR_INDEX;
2569  }
2570 
2571  if (txq[(node_id)].outgoing.progress[tx_id].tx_id == 0)
2572  {
2573  return TRANSFER_ERROR_MATCH;
2574  }
2575 
2576  if (tx_id >= PROGRESS_QUEUE_SIZE)
2577  {
2578  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2579  {
2580  outgoing_tx_del(node_id, i);
2581  }
2582  }
2583  else
2584  {
2585  tx_progress tx_out = txq[(node_id)].outgoing.progress[tx_id];
2586 
2587  // erase the transaction
2588 
2589  txq[(node_id)].outgoing.progress[tx_id].fp = nullptr;
2590  txq[(node_id)].outgoing.progress[tx_id].enabled = false;
2591  txq[(node_id)].outgoing.progress[tx_id].tx_id = 0;
2592  txq[(node_id)].outgoing.progress[tx_id].complete = false;
2593  txq[(node_id)].outgoing.progress[tx_id].filepath = "";
2594  txq[(node_id)].outgoing.progress[tx_id].havemeta = false;
2595  txq[(node_id)].outgoing.progress[tx_id].savetime = 0;
2596  txq[(node_id)].outgoing.progress[tx_id].temppath = "";
2597  txq[(node_id)].outgoing.progress[tx_id].file_name = "";
2598  txq[(node_id)].outgoing.progress[tx_id].file_size = 0;
2599  txq[(node_id)].outgoing.progress[tx_id].node_name = "";
2600  txq[(node_id)].outgoing.progress[tx_id].agent_name = "";
2601  txq[(node_id)].outgoing.progress[tx_id].total_bytes = 0;
2602  txq[(node_id)].outgoing.progress[tx_id].file_info.clear();
2603 
2604  if (txq[(node_id)].outgoing.size)
2605  {
2606  --txq[(node_id)].outgoing.size;
2607  }
2608 
2609  // Remove the file
2610  if(remove_file && remove(tx_out.filepath.c_str()))
2611  {
2612  if (agent->debug_level)
2613  {
2614  debug_fd_lock.lock();
2615  fprintf(agent->get_debug_fd(), "%.4f %.4f Main/Outgoing: Del outgoing: %u %s %s %s - Unable to remove file\n", tet.split(), dt.lap(), tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
2616  fflush(agent->get_debug_fd());
2617  debug_fd_lock.unlock();
2618  }
2619  }
2620 
2621  // Remove the META file
2622  string meta_filepath = tx_out.temppath + ".meta";
2623  remove(meta_filepath.c_str());
2624 
2625  if (agent->debug_level)
2626  {
2627  debug_fd_lock.lock();
2628  fprintf(agent->get_debug_fd(), "%.4f %.4f Main/Outgoing: Del outgoing: %u %s %s %s\n", tet.split(), dt.lap(), tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
2629  fflush(agent->get_debug_fd());
2630  debug_fd_lock.unlock();
2631  }
2632  }
2633 
2634  return outgoing_tx_recount(node_id);
2635 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256, bool remove_file=true)
Definition: agent_file4.cpp:2564
static Agent * agent
Definition: agent_file4.cpp:94
ElapsedTime dt
Definition: agent_file4.cpp:186
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
string agent_name
Definition: transferlib.h:350
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
int32_t outgoing_tx_recount(uint8_t node)
Definition: agent_file4.cpp:2685
string file_name
Definition: transferlib.h:351
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
int32_t outgoing_tx_purge ( uint8_t  node,
uint16_t  tx_id = 256 
)
2638 {
2639  if (node_id == 0 || node_id >= txq.size())
2640  {
2641  return TRANSFER_ERROR_INDEX;
2642  }
2643 
2644  if (tx_id >= PROGRESS_QUEUE_SIZE)
2645  {
2646  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2647  {
2648  txq[(node_id)].outgoing.progress[i].fp = nullptr;
2649  txq[(node_id)].outgoing.progress[i].tx_id = 0;
2650  txq[(node_id)].outgoing.progress[i].complete = false;
2651  txq[(node_id)].outgoing.progress[i].filepath = "";
2652  txq[(node_id)].outgoing.progress[i].havemeta = false;
2653  txq[(node_id)].outgoing.progress[i].savetime = 0;
2654  txq[(node_id)].outgoing.progress[i].temppath = "";
2655  txq[(node_id)].outgoing.progress[i].file_name = "";
2656  txq[(node_id)].outgoing.progress[i].file_size = 0;
2657  txq[(node_id)].outgoing.progress[i].node_name = "";
2658  txq[(node_id)].outgoing.progress[i].agent_name = "";
2659  txq[(node_id)].outgoing.progress[i].total_bytes = 0;
2660  txq[(node_id)].outgoing.progress[i].file_info.clear();
2661  }
2662  }
2663  else {
2664  {
2665  txq[(node_id)].outgoing.progress[tx_id].fp = nullptr;
2666  txq[(node_id)].outgoing.progress[tx_id].tx_id = 0;
2667  txq[(node_id)].outgoing.progress[tx_id].complete = false;
2668  txq[(node_id)].outgoing.progress[tx_id].filepath = "";
2669  txq[(node_id)].outgoing.progress[tx_id].havemeta = false;
2670  txq[(node_id)].outgoing.progress[tx_id].savetime = 0;
2671  txq[(node_id)].outgoing.progress[tx_id].temppath = "";
2672  txq[(node_id)].outgoing.progress[tx_id].file_name = "";
2673  txq[(node_id)].outgoing.progress[tx_id].file_size = 0;
2674  txq[(node_id)].outgoing.progress[tx_id].node_name = "";
2675  txq[(node_id)].outgoing.progress[tx_id].agent_name = "";
2676  txq[(node_id)].outgoing.progress[tx_id].total_bytes = 0;
2677  txq[(node_id)].outgoing.progress[tx_id].file_info.clear();
2678  }
2679  }
2680  txq[(node_id)].outgoing.size = 0;
2681 
2682  return 0;
2683 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int i
Definition: rw_test.cpp:37
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
int32_t outgoing_tx_recount ( uint8_t  node)
2686 {
2687  if (node_id == 0 || node_id >= txq.size())
2688  {
2689  return TRANSFER_ERROR_INDEX;
2690  }
2691 
2692  txq[(node_id)].outgoing.size = 0;
2693  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2694  {
2695  if (txq[(node_id)].outgoing.progress[i].tx_id)
2696  {
2697  ++txq[(node_id)].outgoing.size;
2698  }
2699  }
2700  return txq[(node_id)].outgoing.size;
2701 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int i
Definition: rw_test.cpp:37
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
int32_t outgoing_tx_load ( uint8_t  node)
2704 {
2705  int32_t iretn=0;
2706 
2707  // Go through outgoing queue, removing files that no longer exist
2708  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2709  {
2710  if (txq[(node_id)].outgoing.progress[i].tx_id != 0 && !data_isfile(txq[(node_id)].outgoing.progress[i].filepath))
2711  {
2712  outgoing_tx_del(node_id, txq[(node_id)].outgoing.progress[i].tx_id);
2713  }
2714  }
2715 
2716  // Go through outgoing directories, adding files not already in queue
2717  if (txq[(node_id)].outgoing.size < PROGRESS_QUEUE_SIZE)
2718  {
2719  vector<filestruc> file_names;
2720  for (filestruc file : data_list_files(txq[(node_id)].node_name, "outgoing", ""))
2721  {
2722  if (file.type == "directory")
2723  {
2724  iretn = data_list_files(txq[(node_id)].node_name, "outgoing", file.name, file_names);
2725  }
2726  }
2727 
2728  // Sort list by size, then go through list of files found, adding to queue.
2729  sort(file_names.begin(), file_names.end(), filestruc_smaller_by_size);
2730  for(uint16_t i=0; i<file_names.size(); ++i)
2731  {
2732  filestruc file = file_names[i];
2733  if (txq[(node_id)].outgoing.size >= PROGRESS_QUEUE_SIZE)
2734  {
2735  break;
2736  }
2737 
2738  //Ignore sub-directories
2739  if (file.type == "directory")
2740  {
2741  continue;
2742  }
2743 
2744  // Ignore zero length files (may still be being formed)
2745 // if (file.size == 0)
2746 // {
2747 // continue;
2748 // }
2749 
2750  // Go through existing queue
2751  // - if it is already there and the size is different, remove it from queue
2752  // - if it is already there, and the size is the same, enable it
2753  // - if it is enabled and the size is zero, remove it from queue and remove file
2754  bool addtoqueue = true;
2755  outgoing_tx_lock.lock();
2756  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2757  {
2758  if (txq[(node_id)].outgoing.progress[i].filepath == file.path)
2759  {
2760  if (txq[(node_id)].outgoing.progress[i].file_size == file.size)
2761  {
2762  addtoqueue = false;
2763  if (!txq[(node_id)].outgoing.progress[i].enabled)
2764  {
2765  txq[(node_id)].outgoing.progress[i].enabled = true;
2766  if (agent->debug_level)
2767  {
2768  debug_fd_lock.lock();
2769  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: outgoing_tx_add: Enable %u %s %s %s %lu\n", tet.split(), dt.lap(), txq[(node_id)].outgoing.progress[i].tx_id, txq[(node_id)].outgoing.progress[i].node_name.c_str(), txq[(node_id)].outgoing.progress[i].agent_name.c_str(), txq[(node_id)].outgoing.progress[i].filepath.c_str(), PROGRESS_QUEUE_SIZE);
2770  fflush(agent->get_debug_fd());
2771  debug_fd_lock.unlock();
2772 
2773  }
2774  }
2775  iretn = 0;
2776  }
2777  else
2778  {
2779  outgoing_tx_del(node_id, i, false);
2780  addtoqueue = false;
2781  iretn = TRANSFER_ERROR_FILESIZE;
2782  }
2783  if (txq[(node_id)].outgoing.progress[i].enabled && txq[(node_id)].outgoing.progress[i].file_size == 0)
2784  {
2785  outgoing_tx_del(node_id, i, true);
2786  addtoqueue = false;
2787  iretn = TRANSFER_ERROR_FILEZERO;
2788  }
2789  }
2790  }
2791 
2792  outgoing_tx_lock.unlock();
2793 
2794  if (addtoqueue)
2795  {
2796  iretn = outgoing_tx_add(file.node, file.agent, file.name);
2797  if (agent->debug_level && iretn != -471)
2798  {
2799  debug_fd_lock.lock();
2800  fprintf(agent->get_debug_fd(), "%.4f %.4f Main/Load: outgoing_tx_add: %s [%d]\n", tet.split(), dt.lap(), file.path.c_str(), iretn);
2801  fflush(agent->get_debug_fd());
2802  debug_fd_lock.unlock();
2803  }
2804  }
2805  }
2806  }
2807 
2808  return iretn;
2809 }
string name
Definition: datalib.h:117
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
off_t size
Definition: datalib.h:120
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int i
Definition: rw_test.cpp:37
string node
Definition: datalib.h:115
int iretn
Definition: rw_test.cpp:37
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256, bool remove_file=true)
Definition: agent_file4.cpp:2564
static Agent * agent
Definition: agent_file4.cpp:94
vector< filestruc > data_list_files(string directory)
Get list of files in a directory, directly.
Definition: datalib.cpp:461
ElapsedTime dt
Definition: agent_file4.cpp:186
#define TRANSFER_ERROR_FILESIZE
Definition: cosmos-errno.h:226
string node_name
Definition: agent_001.cpp:46
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
Definition: datalib.h:113
bool data_isfile(string path, off_t size)
Definition: datalib.cpp:1895
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file4.cpp:2274
string path
Definition: datalib.h:119
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
bool filestruc_smaller_by_size(const filestruc &a, const filestruc &b)
Definition: agent_file4.cpp:3148
string agent
Definition: datalib.h:116
static std::mutex outgoing_tx_lock
Definition: agent_file4.cpp:142
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
#define TRANSFER_ERROR_FILEZERO
Definition: cosmos-errno.h:227
string type
Definition: datalib.h:118
int32_t incoming_tx_add ( tx_progress tx_in)
2812 {
2813  uint8_t node_id = lookup_node_id(tx_in.node_name);
2814  if (node_id == 0)
2815  {
2816  if (agent->debug_level)
2817  {
2818  debug_fd_lock.lock();
2819  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: incoming_tx_add: TRANSFER_ERROR_NODE\n", tet.split(), dt.lap());
2820  fflush(agent->get_debug_fd());
2821  debug_fd_lock.unlock();
2822  }
2823  return TRANSFER_ERROR_NODE;
2824  }
2825 
2826  // Check for an actual file name
2827  if (tx_in.file_name.size())
2828  {
2829  tx_in.filepath = data_base_path(tx_in.node_name, "incoming", tx_in.agent_name, tx_in.file_name);
2830  }
2831  else
2832  {
2833  tx_in.filepath = "";
2834  }
2835 
2836  string tx_name = "in_"+std::to_string(tx_in.tx_id);
2837  tx_in.temppath = data_base_path(tx_in.node_name, "temp", "file", tx_name);
2838 
2839  // Check for a duplicate file name of something already in queue
2840  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
2841  {
2842  if (!txq[(node_id)].incoming.progress[i].filepath.empty() && tx_in.filepath == txq[(node_id)].incoming.progress[i].filepath)
2843  {
2844  if (agent->debug_level)
2845  {
2846  debug_fd_lock.lock();
2847  fprintf(agent->get_debug_fd(), "%u %s %s %s TRANSFER_ERROR_DUPLICATE\n", tx_in.tx_id, tx_in.node_name.c_str(), tx_in.agent_name.c_str(), tx_in.filepath.c_str());
2848  fflush(agent->get_debug_fd());
2849  debug_fd_lock.unlock();
2850  }
2851  // Remove the META file
2852  string filepath = tx_in.temppath + ".meta";
2853  remove(filepath.c_str());
2854  return TRANSFER_ERROR_DUPLICATE;
2855  }
2856  }
2857 
2858  tx_in.savetime = 0.;
2859  tx_in.fp = nullptr;
2860  // tx_in.state = STATE_REQMETA;
2861  // if (tx_in.havemeta)
2862  // {
2863  // tx_in.state = STATE_REQDATA;
2864  // }
2865  // if (tx_in.complete)
2866  // {
2867  // tx_in.state = STATE_COMPLETE;
2868  // }
2869 
2870  // Put it in list
2871  // txq[(node_id)].incoming.progress[tx_in.tx_id] = tx_in;
2872  txq[(node_id)].incoming.progress[tx_in.tx_id].tx_id = tx_in.tx_id;
2873  txq[(node_id)].incoming.progress[tx_in.tx_id].havemeta = tx_in.havemeta;
2874  txq[(node_id)].incoming.progress[tx_in.tx_id].sendmeta = tx_in.sendmeta;
2875  txq[(node_id)].incoming.progress[tx_in.tx_id].sentmeta = tx_in.sentmeta;
2876  txq[(node_id)].incoming.progress[tx_in.tx_id].senddata = tx_in.senddata;
2877  txq[(node_id)].incoming.progress[tx_in.tx_id].sentdata = tx_in.sentdata;
2878  txq[(node_id)].incoming.progress[tx_in.tx_id].complete = tx_in.complete;
2879  txq[(node_id)].incoming.progress[tx_in.tx_id].node_name = tx_in.node_name;
2880  txq[(node_id)].incoming.progress[tx_in.tx_id].agent_name = tx_in.agent_name;
2881  txq[(node_id)].incoming.progress[tx_in.tx_id].file_name = tx_in.file_name;
2882  txq[(node_id)].incoming.progress[tx_in.tx_id].filepath = tx_in.filepath;
2883  txq[(node_id)].incoming.progress[tx_in.tx_id].temppath = tx_in.temppath;
2884  txq[(node_id)].incoming.progress[tx_in.tx_id].savetime = tx_in.savetime;
2885  txq[(node_id)].incoming.progress[tx_in.tx_id].file_size = tx_in.file_size;
2886  txq[(node_id)].incoming.progress[tx_in.tx_id].total_bytes = tx_in.total_bytes;
2887  txq[(node_id)].incoming.progress[tx_in.tx_id].file_info.clear();
2888  for (file_progress filep : tx_in.file_info)
2889  {
2890  txq[(node_id)].incoming.progress[tx_in.tx_id].file_info.push_back(filep);
2891  }
2892  txq[(node_id)].incoming.progress[tx_in.tx_id].fp = tx_in.fp;
2893  ++txq[(node_id)].incoming.size;
2894 
2895  if (agent->debug_level)
2896  {
2897  debug_fd_lock.lock();
2898  fprintf(agent->get_debug_fd(), "%.4f %.4f Main/Incoming: Add incoming: %u %s %s %s %lu\n", tet.split(), dt.lap(), tx_in.tx_id, tx_in.node_name.c_str(), tx_in.agent_name.c_str(), tx_in.filepath.c_str(), PROGRESS_QUEUE_SIZE);
2899  fflush(agent->get_debug_fd());
2900  debug_fd_lock.unlock();
2901  }
2902 
2903  return incoming_tx_recount(node_id);
2904 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
bool sentdata
Definition: transferlib.h:347
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
int32_t lookup_node_id(string node_name)
Definition: transferlib.cpp:514
static Agent * agent
Definition: agent_file4.cpp:94
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
ElapsedTime dt
Definition: agent_file4.cpp:186
int32_t incoming_tx_recount(uint8_t node)
Definition: agent_file4.cpp:3130
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_DUPLICATE
Definition: cosmos-errno.h:225
string agent_name
Definition: transferlib.h:350
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
string file_name
Definition: transferlib.h:351
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
bool sendmeta
Definition: transferlib.h:344
int32_t incoming_tx_add ( string  node_name,
PACKET_TX_ID_TYPE  tx_id 
)
2907 {
2908  tx_progress tx_in;
2909 
2910  tx_in.tx_id = tx_id;
2911  tx_in.sendmeta = true;
2912  tx_in.havemeta = false;
2913  tx_in.sentmeta = false;
2914  tx_in.senddata = false;
2915  tx_in.sentdata = false;
2916  tx_in.complete = false;
2917  tx_in.node_name = node_name;
2918  tx_in.agent_name = "";
2919  tx_in.file_name = "";
2920  tx_in.savetime = 0.;
2921  tx_in.file_size = 0;
2922  tx_in.total_bytes = 0;
2923  tx_in.file_info.clear();
2924 
2925  int32_t iretn = incoming_tx_add(tx_in);
2926 
2927  return iretn;
2928 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file4.cpp:2811
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
bool sentdata
Definition: transferlib.h:347
Definition: transferlib.h:338
bool havemeta
Definition: transferlib.h:342
int iretn
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
double savetime
Definition: transferlib.h:354
string node_name
Definition: agent_001.cpp:46
deque< file_progress > file_info
Definition: transferlib.h:358
bool sentmeta
Definition: transferlib.h:345
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
string file_name
Definition: transferlib.h:351
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
bool sendmeta
Definition: transferlib.h:344
int32_t incoming_tx_update ( packet_struct_metashort  meta)
2931 {
2932  int32_t node_id = check_node_id(meta.node_id);
2933  if (node_id <= 0)
2934  {
2935  if (node_id < 0)
2936  {
2937  return node_id;
2938  }
2939  else
2940  {
2941  return TRANSFER_ERROR_NODE;
2942  }
2943  }
2944 
2945  if (meta.tx_id)
2946  {
2947  txq[node_id].incoming.rcvdmeta = true;
2948  if (txq[(node_id)].incoming.progress[meta.tx_id].tx_id != meta.tx_id)
2949  {
2950  txq[(node_id)].incoming.progress[meta.tx_id].tx_id = meta.tx_id;
2951  txq[(node_id)].incoming.progress[meta.tx_id].havemeta = false;
2952  }
2953 
2954  txq[(node_id)].incoming.progress[meta.tx_id].sendmeta = false;
2955  txq[(node_id)].incoming.progress[meta.tx_id].sentmeta = true;
2956  txq[(node_id)].incoming.progress[meta.tx_id].senddata = false;
2957  txq[(node_id)].incoming.progress[meta.tx_id].sentdata = false;
2958  txq[(node_id)].incoming.progress[meta.tx_id].datatime = currentmjd();
2959 
2960  if (!txq[(node_id)].incoming.progress[meta.tx_id].havemeta)
2961  {
2962  // Core META information
2963  txq[(node_id)].incoming.progress[meta.tx_id].node_name = txq[(node_id)].node_name;
2964  txq[(node_id)].incoming.progress[meta.tx_id].agent_name = meta.agent_name;
2965  txq[(node_id)].incoming.progress[meta.tx_id].file_name = meta.file_name;
2966  txq[(node_id)].incoming.progress[meta.tx_id].file_size = meta.file_size;
2967  txq[(node_id)].incoming.progress[meta.tx_id].filepath = data_base_path(txq[(node_id)].incoming.progress[meta.tx_id].node_name, "incoming", txq[(node_id)].incoming.progress[meta.tx_id].agent_name, txq[(node_id)].incoming.progress[meta.tx_id].file_name);
2968  string tx_name = "in_"+std::to_string(txq[(node_id)].incoming.progress[meta.tx_id].tx_id);
2969  txq[(node_id)].incoming.progress[meta.tx_id].temppath = data_base_path(txq[(node_id)].incoming.progress[meta.tx_id].node_name, "temp", "file", tx_name);
2970 
2971  // Derivative META information
2972  // txq[(node_id)].incoming.progress[meta.tx_id].state = STATE_REQDATA;
2973  txq[(node_id)].incoming.progress[meta.tx_id].savetime = 0.;
2974  txq[(node_id)].incoming.progress[meta.tx_id].havemeta = true;
2975  txq[(node_id)].incoming.progress[meta.tx_id].complete = false;
2976  txq[(node_id)].incoming.progress[meta.tx_id].total_bytes = 0;
2977  txq[(node_id)].incoming.progress[meta.tx_id].fp = nullptr;
2978  txq[(node_id)].incoming.progress[meta.tx_id].file_info.clear();
2979 
2980  // Save it to disk
2981  write_meta(txq[(node_id)].incoming.progress[meta.tx_id]);
2982  }
2983 
2984  if (agent->debug_level)
2985  {
2986  debug_fd_lock.lock();
2987  fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming: Update incoming: %u %s %s %s\n", tet.split(), dt.lap(), txq[(node_id)].incoming.progress[meta.tx_id].tx_id, txq[(node_id)].incoming.progress[meta.tx_id].node_name.c_str(), txq[(node_id)].incoming.progress[meta.tx_id].agent_name.c_str(), txq[(node_id)].incoming.progress[meta.tx_id].file_name.c_str());
2988  fflush(agent->get_debug_fd());
2989  debug_fd_lock.unlock();
2990  }
2991 
2992  return meta.tx_id;
2993  }
2994  else
2995  {
2996  return TRANSFER_ERROR_INDEX;
2997  }
2998 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file4.cpp:1843
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:248
string to_string(char *value)
Definition: stringlib.cpp:220
static Agent * agent
Definition: agent_file4.cpp:94
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:247
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
ElapsedTime dt
Definition: agent_file4.cpp:186
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
char file_name[128]
Definition: transferlib.h:250
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:249
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:251
int32_t incoming_tx_del ( uint8_t  node,
uint16_t  tx_id = 256 
)
3001 {
3002  node_id = check_node_id(node_id);
3003  if (node_id == 0)
3004  {
3005  return TRANSFER_ERROR_NODE;
3006  }
3007 
3008  if (tx_id >= PROGRESS_QUEUE_SIZE)
3009  {
3010  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
3011  {
3012  incoming_tx_del(node_id, i);
3013  }
3014  }
3015  else
3016  {
3017  if (txq[(node_id)].incoming.progress[tx_id].tx_id == 0)
3018  {
3019  return TRANSFER_ERROR_MATCH;
3020  }
3021 
3022  tx_progress tx_in = txq[(node_id)].incoming.progress[tx_id];
3023 
3024  txq[(node_id)].incoming.progress[tx_id].tx_id = 0;
3025  txq[(node_id)].incoming.progress[tx_id].havemeta = false;
3026  txq[(node_id)].incoming.progress[tx_id].havedata = false;
3027  txq[(node_id)].incoming.progress[tx_id].complete = false;
3028  if (txq[(node_id)].incoming.size)
3029  {
3030  --txq[(node_id)].incoming.size;
3031  }
3032 
3033  // Move file to its final location
3034  if (tx_in.complete)
3035  {
3036  // Close the DATA file
3037  if (tx_in.fp != nullptr)
3038  {
3039  fclose(tx_in.fp);
3040  tx_in.fp = nullptr;
3041  }
3042  string final_filepath = tx_in.temppath + ".file";
3043  int iret = rename(final_filepath.c_str(), tx_in.filepath.c_str());
3044  // Make sure metadata is recorded
3045  write_meta(tx_in, 0.);
3046  if (agent->debug_level)
3047  {
3048  debug_fd_lock.lock();
3049  fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming: Renamed/Data: %d %s\n", tet.split(), dt.lap(), iret, tx_in.filepath.c_str());
3050  fflush(agent->get_debug_fd());
3051  debug_fd_lock.unlock();
3052  }
3053 
3054  // Mark complete
3055  tx_in.complete = true;
3056  tx_in.senddata = false;
3057  tx_in.sentdata = true;
3058  }
3059 
3060  string filepath;
3061  //Remove the DATA file
3062  filepath = tx_in.temppath + ".file";
3063  remove(filepath.c_str());
3064 
3065  // Remove the META file
3066  filepath = tx_in.temppath + ".meta";
3067  remove(filepath.c_str());
3068 
3069  if (agent->debug_level)
3070  {
3071  debug_fd_lock.lock();
3072  fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming: Del incoming: %u %s\n", tet.split(), dt.lap(), tx_in.tx_id, tx_in.node_name.c_str());
3073  fflush(agent->get_debug_fd());
3074  debug_fd_lock.unlock();
3075  }
3076  }
3077 
3078  return incoming_tx_recount(node_id);
3079 
3080 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file4.cpp:1843
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
bool sentdata
Definition: transferlib.h:347
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file4.cpp:3000
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
FILE * fp
Definition: transferlib.h:359
static Agent * agent
Definition: agent_file4.cpp:94
ElapsedTime dt
Definition: agent_file4.cpp:186
int32_t incoming_tx_recount(uint8_t node)
Definition: agent_file4.cpp:3130
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
bool senddata
Definition: transferlib.h:346
bool complete
Definition: transferlib.h:348
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
int32_t incoming_tx_purge ( uint8_t  node,
uint16_t  tx_id = 256 
)
3083 {
3084  if (node_id == 0 || node_id >= txq.size())
3085  {
3086  return TRANSFER_ERROR_INDEX;
3087  }
3088 
3089  if (tx_id >= PROGRESS_QUEUE_SIZE)
3090  {
3091  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
3092  {
3093  txq[(node_id)].incoming.progress[i].fp = nullptr;
3094  txq[(node_id)].incoming.progress[i].tx_id = 0;
3095  txq[(node_id)].incoming.progress[i].complete = false;
3096  txq[(node_id)].incoming.progress[i].filepath = "";
3097  txq[(node_id)].incoming.progress[i].havemeta = false;
3098  txq[(node_id)].incoming.progress[i].savetime = 0;
3099  txq[(node_id)].incoming.progress[i].temppath = "";
3100  txq[(node_id)].incoming.progress[i].file_name = "";
3101  txq[(node_id)].incoming.progress[i].file_size = 0;
3102  txq[(node_id)].incoming.progress[i].node_name = "";
3103  txq[(node_id)].incoming.progress[i].agent_name = "";
3104  txq[(node_id)].incoming.progress[i].total_bytes = 0;
3105  txq[(node_id)].incoming.progress[i].file_info.clear();
3106  }
3107  }
3108  else {
3109  {
3110  txq[(node_id)].incoming.progress[tx_id].fp = nullptr;
3111  txq[(node_id)].incoming.progress[tx_id].tx_id = 0;
3112  txq[(node_id)].incoming.progress[tx_id].complete = false;
3113  txq[(node_id)].incoming.progress[tx_id].filepath = "";
3114  txq[(node_id)].incoming.progress[tx_id].havemeta = false;
3115  txq[(node_id)].incoming.progress[tx_id].savetime = 0;
3116  txq[(node_id)].incoming.progress[tx_id].temppath = "";
3117  txq[(node_id)].incoming.progress[tx_id].file_name = "";
3118  txq[(node_id)].incoming.progress[tx_id].file_size = 0;
3119  txq[(node_id)].incoming.progress[tx_id].node_name = "";
3120  txq[(node_id)].incoming.progress[tx_id].agent_name = "";
3121  txq[(node_id)].incoming.progress[tx_id].total_bytes = 0;
3122  txq[(node_id)].incoming.progress[tx_id].file_info.clear();
3123  }
3124  }
3125  txq[(node_id)].incoming.size = 0;
3126 
3127  return 0;
3128 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int i
Definition: rw_test.cpp:37
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
int32_t incoming_tx_recount ( uint8_t  node)
3131 {
3132  if (node_id == 0 || node_id >= txq.size())
3133  {
3134  return TRANSFER_ERROR_INDEX;
3135  }
3136 
3137  txq[(node_id)].incoming.size = 0;
3138  for (uint16_t i=1; i<PROGRESS_QUEUE_SIZE; ++i)
3139  {
3140  if (txq[(node_id)].incoming.progress[i].tx_id)
3141  {
3142  ++txq[(node_id)].incoming.size;
3143  }
3144  }
3145  return txq[(node_id)].incoming.size;
3146 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int i
Definition: rw_test.cpp:37
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
vector<file_progress> find_chunks_missing ( tx_progress tx)
2018 {
2019  vector<file_progress> missing;
2020  file_progress tp;
2021 
2022  if (!tx.havemeta)
2023  {
2024  return missing;
2025  }
2026 
2027  if (tx.file_info.size() == 0)
2028  {
2029  tp.chunk_start = 0;
2030  tp.chunk_end = tx.file_size - 1;
2031  missing.push_back(tp);
2032  }
2033  else
2034  {
2036  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
2037 
2038  // Check missing before first chunk
2039  if (tx.file_info[0].chunk_start)
2040  {
2041  tp.chunk_start = 0;
2042  tp.chunk_end = tx.file_info[0].chunk_start - 1;
2043  missing.push_back(tp);
2044  }
2045 
2046  // Check missing between chunks
2047  for (uint32_t i=1; i<tx.file_info.size(); ++i)
2048  {
2049  if (tx.file_info[i-1].chunk_end+1 != tx.file_info[i].chunk_start)
2050  {
2051  tp.chunk_start = tx.file_info[i-1].chunk_end + 1;
2052  tp.chunk_end = tx.file_info[i].chunk_start - 1;
2053  missing.push_back(tp);
2054  }
2055  }
2056 
2057  // Check missing after last chunk
2058  if (tx.file_info[tx.file_info.size()-1].chunk_end + 1 != tx.file_size)
2059  {
2060  tp.chunk_start = tx.file_info[tx.file_info.size()-1].chunk_end + 1;
2061  tp.chunk_end = tx.file_size - 1;
2062  missing.push_back(tp);
2063  }
2064  }
2065 
2066  // calculate bytes so far
2067  tx.total_bytes = 0;
2068  for (file_progress prog : tx.file_info)
2069  {
2070  tx.total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
2071  }
2072  if (tx.total_bytes == tx.file_size)
2073  {
2074  tx.complete = true;
2075  }
2076 
2077  return (missing);
2078 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file4.cpp:1971
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file4.cpp:1966
bool complete
Definition: transferlib.h:348
vector<file_progress> find_chunks_togo ( tx_progress tx)
2081 {
2082  vector<file_progress> togo;
2083  file_progress tp;
2084 
2085  if (tx.file_info.size())
2086  {
2088  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
2089 
2090  // Set first chunk
2091  tp.chunk_start = tx.file_info[0].chunk_start;
2092  tp.chunk_end = tx.file_info[0].chunk_end;
2093 
2094  // Add middle chunks to go
2095  for (uint32_t i=1; i<tx.file_info.size(); ++i)
2096  {
2097  if (tx.file_info[i-1].chunk_end+1 == tx.file_info[i].chunk_start)
2098  {
2099  tp.chunk_end = tx.file_info[i].chunk_end;
2100  }
2101  else
2102  {
2103  togo.push_back(tp);
2104  tp.chunk_start = tx.file_info[i].chunk_start;
2105  tp.chunk_end = tx.file_info[i].chunk_end;
2106  }
2107  }
2108 
2109  // Add last chunk
2110  togo.push_back(tp);
2111  }
2112 
2113  // calculate bytes left
2114  tx.total_bytes = 0;
2115  for (file_progress prog : togo)
2116  {
2117  tx.total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
2118  }
2119 
2120  return (togo);
2121 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file4.cpp:1971
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file4.cpp:1966
PACKET_FILE_SIZE_TYPE merge_chunks_overlap ( tx_progress tx)
1972 {
1973  for (uint16_t i=tx.file_info.size()-1; i<tx.file_info.size(); --i)
1974  {
1975  if (tx.file_info[i].chunk_end >= tx.file_size)
1976  {
1977  tx.file_info.pop_back();
1978  }
1979  }
1980  switch (tx.file_info.size())
1981  {
1982  case 0:
1983  {
1984  tx.total_bytes = 0;
1985  break;
1986  }
1987  case 1:
1988  {
1989  tx.total_bytes = (tx.file_info[0].chunk_end - tx.file_info[0].chunk_start) + 1;
1990  break;
1991  }
1992  default:
1993  {
1994  tx.total_bytes = 0;
1995  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
1996  for (uint32_t i=0; i<tx.file_info.size(); ++i)
1997  {
1998  for (uint32_t j=i+1; j<tx.file_info.size(); ++j)
1999  {
2000  while (j < tx.file_info.size() && tx.file_info[j].chunk_start <= tx.file_info[i].chunk_end+1)
2001  {
2002  if (tx.file_info[j].chunk_end > tx.file_info[i].chunk_end)
2003  {
2004  tx.file_info[i].chunk_end = tx.file_info[j].chunk_end;
2005  }
2006  tx.file_info.erase(tx.file_info.begin()+j);
2007  }
2008  }
2009  tx.total_bytes += (tx.file_info[i].chunk_end - tx.file_info[i].chunk_start) + 1;
2010  }
2011  break;
2012  }
2013  }
2014  return tx.total_bytes;
2015 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
int i
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file4.cpp:1966
double queuecheck ( PACKET_NODE_ID_TYPE  node_id)
1450 {
1451  int32_t use_channel;
1452 
1453  use_channel = -1;
1454  profile_check(__LINE__, 2);
1455 
1456  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1457  {
1458  if (txq[node_id].node_name == out_comm_channel[i].node)
1459  {
1460  use_channel = i;
1461  break;
1462  }
1463  }
1464 
1465  if (use_channel == -1)
1466  {
1467  return -1.;
1468  }
1469 
1470  return (1. * transmit_queue.size() * out_comm_channel[use_channel].packet_size) / out_comm_channel[use_channel].throughput;
1471 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int i
Definition: rw_test.cpp:37
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file4.cpp:128
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
static string node
Definition: agent_monitor.cpp:126
void profile_check(int32_t line_number, uint16_t thread=0)
Definition: agent_file4.cpp:3629
int32_t queuesendto ( PACKET_NODE_ID_TYPE  node_id,
string  type,
vector< PACKET_BYTE packet 
)
1474 {
1475  transmit_queue_entry tentry;
1476  int32_t use_channel;
1477 
1478 // use_channel = -1;
1479 // for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1480 // {
1481 // if (txq[node_id].node_name == out_comm_channel[i].node)
1482 // {
1483 // use_channel = i;
1484 // break;
1485 // }
1486 // }
1487 
1488 // if (use_channel > out_comm_channel.size())
1489  use_channel = check_channel(node_id);
1490  if (use_channel <= 0)
1491  {
1492  return -1.;
1493  }
1494 
1495  tentry.type = type;
1496  tentry.channel = use_channel;
1497  tentry.packet = packet;
1498  tentry.time_step = (28 + packet.size()) / (86400. * out_comm_channel[use_channel].throughput);
1499  txqueue_lock.lock();
1500  if (transmit_queue.empty())
1501  {
1502  tentry.nmjd = out_comm_channel[use_channel].nmjd + tentry.time_step;
1503  }
1504  else
1505  {
1506  tentry.nmjd = transmit_queue.back().nmjd + tentry.time_step;
1507  }
1508  transmit_queue.push(tentry);
1509  txqueue_lock.unlock();
1510  // transmit_queue_check.notify_one();
1511  // Sleep just a bit to give other threads a chance
1512  COSMOS_SLEEP(.001);
1513  return use_channel;
1514 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
double time_step
Definition: agent_file3.cpp:126
static std::mutex txqueue_lock
Definition: agent_file4.cpp:140
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
double nmjd
Definition: agent_file3.cpp:125
static uint16_t use_channel
Definition: agent_file.cpp:97
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file4.cpp:128
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
Definition: agent_file.cpp:110
int32_t check_channel(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file4.cpp:3258
int32_t mysendto ( string  type,
int32_t  use_channel,
vector< PACKET_BYTE > &  buf 
)
1517 {
1518  int32_t iretn;
1519  double cmjd;
1520 
1521  profile_check(__LINE__, 3);
1522 
1523  if ((cmjd = currentmjd(0.)) < out_comm_channel[use_channel].nmjd)
1524  {
1525  COSMOS_SLEEP((86400. * (out_comm_channel[use_channel].nmjd - cmjd)));
1526  }
1527 
1528  profile_check(__LINE__, 3);
1529 
1530  iretn = sendto(out_comm_channel[use_channel].chansock.cudp, reinterpret_cast<const char*>(&buf[0]), buf.size(), 0, reinterpret_cast<sockaddr*>(&out_comm_channel[use_channel].chansock.caddr), sizeof(struct sockaddr_in));
1531 
1532  if (iretn >= 0)
1533  {
1534  profile_check(__LINE__, 3);
1535 
1536  ++packet_out_count;
1538  out_comm_channel[use_channel].nmjd = out_comm_channel[use_channel].lomjd + ((28+iretn) / (float)out_comm_channel[use_channel].throughput)/86400.;
1539  if (agent->debug_level)
1540  {
1541  debug_packet(buf, PACKET_OUT, type, use_channel);
1542  }
1543  }
1544  else
1545  {
1546  iretn = -errno;
1547  ++send_error_count;
1548  }
1549 
1550  profile_check(__LINE__, 3);
1551 
1552  return iretn;
1553 }
static double cmjd
Definition: agent_monitor.cpp:121
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
int iretn
Definition: rw_test.cpp:37
static Agent * agent
Definition: agent_file4.cpp:94
static uint32_t packet_out_count
Definition: agent_file4.cpp:150
#define PACKET_OUT
Definition: agent_file4.cpp:71
static uint16_t use_channel
Definition: agent_file.cpp:97
static uint32_t send_error_count
Definition: agent_file4.cpp:154
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
void debug_packet(vector< PACKET_BYTE > buf, uint8_t direction, string type, int32_t use_channel)
Definition: agent_file4.cpp:1703
char buf[128]
Definition: rw_test.cpp:40
void profile_check(int32_t line_number, uint16_t thread=0)
Definition: agent_file4.cpp:3629
int32_t myrecvfrom ( string  type,
socket_channel channel,
vector< PACKET_BYTE > &  buf,
uint32_t  length,
double  dtimeout = 1. 
)
1556 {
1557  int32_t nbytes = 0;
1558 
1559  buf.resize(length);
1560  ElapsedTime et;
1561  do
1562  {
1563  profile_check(__LINE__, 1);
1564 
1565  fd_set set;
1566  FD_ZERO(&set);
1567  int fdmax = -1;
1568  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1569  {
1570  profile_check(__LINE__, 1);
1571 
1572  FD_SET(out_comm_channel[i].chansock.cudp, &set);
1573  if (out_comm_channel[i].chansock.cudp > fdmax)
1574  {
1575  fdmax = out_comm_channel[i].chansock.cudp;
1576  }
1577  }
1578  double rtimeout = dtimeout - et.split();
1579  if (rtimeout >= 0.)
1580  {
1581 #if !defined(COSMOS_WIN_OS)
1582  profile_check(__LINE__, 1);
1583 
1584  timeval timeout;
1585  timeout.tv_sec = static_cast<int32_t>(rtimeout);
1586  timeout.tv_usec = static_cast<int32_t>(1000000. * (rtimeout - timeout.tv_sec));
1587  profile_check(__LINE__, 1);
1588 
1589  int rv = select(fdmax+1, &set, nullptr, nullptr, &timeout);
1590  profile_check(__LINE__, 1);
1591 
1592  if (rv == -1)
1593  {
1594  nbytes = -errno;
1595  }
1596  else if (rv == 0)
1597  {
1598  nbytes = GENERAL_ERROR_TIMEOUT;
1600  }
1601  else
1602  {
1603  profile_check(__LINE__, 1);
1604 
1605  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1606  {
1607  if (FD_ISSET(out_comm_channel[i].chansock.cudp, &set))
1608  {
1609  profile_check(__LINE__, 1);
1610 
1611  channel = out_comm_channel[i].chansock;
1612  nbytes = recvfrom(channel.cudp, reinterpret_cast<char *>(&buf[0]), length, 0, reinterpret_cast<sockaddr*>(&channel.caddr), reinterpret_cast<socklen_t *>(&channel.addrlen));
1613  if (nbytes > 0)
1614  {
1615  profile_check(__LINE__, 1);
1616 
1617  uint16_t crccalc = calc_crc16ccitt(&buf[3], nbytes-3);
1618  uint16_t crc;
1619  memmove(&crc, &buf[0]+PACKET_HEADER_OFFSET_CRC, sizeof(PACKET_CRC));
1620  if (crc != crccalc)
1621  {
1622  nbytes = GENERAL_ERROR_CRC;
1623  ++crc_error_count;
1624  }
1625  else
1626  {
1627  ++packet_in_count;
1628  buf.resize(nbytes);
1629  if (agent->debug_level)
1630  {
1631  debug_packet(buf, PACKET_IN, type, i);
1632  }
1633  }
1634  return nbytes;
1635  }
1636  else
1637  {
1638  profile_check(__LINE__, 1);
1639 
1640  if (nbytes < 0)
1641  {
1642  nbytes = -errno;
1643  ++recv_error_count;
1644  }
1645  else
1646  {
1647  nbytes = GENERAL_ERROR_INPUT;
1648  ++recv_error_count;
1649  }
1650  }
1651  }
1652  }
1653  }
1654 #else
1655  for (uint16_t i=0; i<out_comm_channel.size(); ++i)
1656  {
1657  channel = out_comm_channel[i].chansock;
1658  nbytes = recvfrom(channel.cudp, reinterpret_cast<char *>(&buf[0]), length, 0, reinterpret_cast<sockaddr*>(&channel.caddr), reinterpret_cast<socklen_t *>(&channel.addrlen));
1659  if (nbytes > 0)
1660  {
1661  uint16_t crccalc = calc_crc16ccitt(&buf[3], nbytes-3);
1662  uint16_t crc;
1663  memmove(&crc, &buf[0]+PACKET_HEADER_OFFSET_CRC, sizeof(PACKET_CRC));
1664  if (crc != crccalc)
1665  {
1666  nbytes = GENERAL_ERROR_CRC;
1667  ++crc_error_count;
1668  }
1669  else
1670  {
1671  ++packet_in_count;
1672  buf.resize(nbytes);
1673  debug_packet(buf, PACKET_IN, type, i);
1674  }
1675  return nbytes;
1676  }
1677  else
1678  {
1679  if (nbytes < 0)
1680  {
1681  if (errno == EAGAIN || errno == EWOULDBLOCK)
1682  {
1683  COSMOS_SLEEP(.1);
1684  break;
1685  }
1686  nbytes = -errno;
1687  ++recv_error_count;
1688  }
1689  else
1690  {
1691  nbytes = GENERAL_ERROR_INPUT;
1692  ++recv_error_count;
1693  }
1694  }
1695  }
1696 #endif // Not windows
1697  }
1698  } while (et.split() < dtimeout);
1699 
1700  return nbytes;
1701 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
#define PACKET_IN
Definition: agent_file4.cpp:70
static uint32_t packet_in_count
Definition: agent_file4.cpp:149
int i
Definition: rw_test.cpp:37
#define GENERAL_ERROR_TIMEOUT
Definition: cosmos-errno.h:292
static Agent * agent
Definition: agent_file4.cpp:94
png_uint_32 crc
Definition: png.c:2173
static uint32_t recv_error_count
Definition: agent_file4.cpp:155
uint16_t PACKET_CRC
Definition: transferlib.h:142
ElapsedTime et
Definition: agent_cpu_device_test.cpp:51
int32_t cudp
Definition: socketlib.h:120
static uint32_t crc_error_count
Definition: agent_file4.cpp:151
#define PACKET_HEADER_OFFSET_CRC
Definition: transferlib.h:150
struct sockaddr_in caddr
Definition: socketlib.h:122
#define GENERAL_ERROR_CRC
Definition: cosmos-errno.h:284
int addrlen
Definition: socketlib.h:128
Definition: elapsedtime.h:62
png_uint_32 length
Definition: png.c:2173
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
void debug_packet(vector< PACKET_BYTE > buf, uint8_t direction, string type, int32_t use_channel)
Definition: agent_file4.cpp:1703
static uint32_t timeout_error_count
Definition: agent_file4.cpp:152
char buf[128]
Definition: rw_test.cpp:40
#define GENERAL_ERROR_INPUT
Definition: cosmos-errno.h:293
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
void profile_check(int32_t line_number, uint16_t thread=0)
Definition: agent_file4.cpp:3629
uint16_t calc_crc16ccitt(uint8_t *buf, int size, bool lsb)
Calculate CRC-16-CCITT.
Definition: mathlib.cpp:2206
void debug_packet ( vector< PACKET_BYTE buf,
uint8_t  direction,
string  type,
int32_t  use_channel 
)
1704 {
1705  static ElapsedTime dt;
1706 
1707  if (agent->debug_level)
1708  {
1709  debug_fd_lock.lock();
1710 
1711  string packet_node_name;
1713  uint8_t node_id = check_node_id(buf[PACKET_HEADER_OFFSET_NODE_ID]);
1714  switch (buf[0] & 0x0f)
1715  {
1716  case PACKET_HEARTBEAT:
1717  case PACKET_REQQUEUE:
1718  case PACKET_QUEUE:
1719  case PACKET_REQMETA:
1720  packet_node_name = reinterpret_cast<char *>(&buf[PACKET_HEADER_OFFSET_NODE_NAME]);
1721  break;
1722  case PACKET_METADATA:
1723  case PACKET_REQDATA:
1724  case PACKET_DATA:
1725  case PACKET_COMPLETE:
1726  case PACKET_CANCEL:
1727  break;
1728  }
1729  if (direction == PACKET_IN)
1730  {
1731  if (out_comm_channel[use_channel].node.empty())
1732  {
1733  if (!node_name.empty())
1734  {
1735  fprintf(agent->get_debug_fd(), "%.4f %.4f RECV L %u R %u %s %s [%s] In: %u Out: %u Size: %lu ", tet.split(), dt.lap(), node_id, node_id, node_name.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1736  }
1737  else
1738  {
1739  fprintf(agent->get_debug_fd(), "%.4f %.4f RECV L %u R %u Unknown %s [%s] In: %u Out: %u Size: %lu ", tet.split(), dt.lap(), node_id, node_id, out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1740  }
1741  }
1742  else
1743  {
1744  fprintf(agent->get_debug_fd(), "%.4f %.4f RECV L %u R %u %s %s [%s] In: %u Out: %u Size: %lu ", tet.split(), dt.lap(), node_id, node_id, out_comm_channel[use_channel].node.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1745  }
1746  }
1747  else if (direction == PACKET_OUT)
1748  {
1749  if (out_comm_channel[use_channel].node.empty())
1750  {
1751  if (!node_name.empty())
1752  {
1753  fprintf(agent->get_debug_fd(), "%.4f %.4f SEND L %u R %u %s %s [%s] In: %u Out: %u Size: %lu ", tet.split(), dt.lap(), node_id, node_id, node_name.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1754  }
1755  else
1756  {
1757  fprintf(agent->get_debug_fd(), "%.4f %.4f SEND L %u R %u Unknown %s [%s] In: %u Out: %u Size: %lu ", tet.split(), dt.lap(), node_id, node_id, out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1758  }
1759  }
1760  else
1761  {
1762  fprintf(agent->get_debug_fd(), "%.4f %.4f SEND L %u R %u %s %s [%s] In: %u Out: %u Size: %lu ", tet.split(), dt.lap(), node_id, node_id, out_comm_channel[use_channel].node.c_str(), out_comm_channel[use_channel].chanip.c_str(), type.c_str(), packet_in_count, packet_out_count, buf.size());
1763  }
1764  }
1765 
1766  switch (buf[0] & 0x0f)
1767  {
1768  case PACKET_METADATA:
1769  {
1770  string file_name(&buf[PACKET_METASHORT_OFFSET_FILE_NAME], &buf[PACKET_METASHORT_OFFSET_FILE_NAME+TRANSFER_MAX_FILENAME]);
1771  fprintf(agent->get_debug_fd(), "[METADATA] %u %u %s ", node_id, buf[PACKET_METASHORT_OFFSET_TX_ID], file_name.c_str());
1772  break;
1773  }
1774  case PACKET_DATA:
1775  {
1777  break;
1778  }
1779  case PACKET_REQDATA:
1780  {
1782  break;
1783  }
1784  case PACKET_COMPLETE:
1785  {
1786  fprintf(agent->get_debug_fd(), "[COMPLETE] %u %u ", node_id, buf[PACKET_COMPLETE_OFFSET_TX_ID]);
1787  break;
1788  }
1789  case PACKET_CANCEL:
1790  {
1791  fprintf(agent->get_debug_fd(), "[CANCEL] %u %u ", node_id, buf[PACKET_CANCEL_OFFSET_TX_ID]);
1792  break;
1793  }
1794  case PACKET_REQMETA:
1795  {
1796  fprintf(agent->get_debug_fd(), "[REQMETA] %u %s ", node_id, &buf[PACKET_HEADER_OFFSET_NODE_NAME]);
1797  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1799  {
1800  fprintf(agent->get_debug_fd(), "%u ", buf[PACKET_REQMETA_OFFSET_TX_ID+i]);
1801  }
1802  break;
1803  }
1804  case PACKET_REQQUEUE:
1805  {
1806  fprintf(agent->get_debug_fd(), "[REQQUEUE] %u %s ", node_id, &buf[PACKET_HEADER_OFFSET_NODE_NAME]);
1807  }
1808  break;
1809  case PACKET_QUEUE:
1810  {
1811  fprintf(agent->get_debug_fd(), "[QUEUE] %u %s ", node_id, &buf[PACKET_HEADER_OFFSET_NODE_NAME]);
1812  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1814  {
1815  fprintf(agent->get_debug_fd(), "%u ", buf[PACKET_QUEUE_OFFSET_TX_ID+i]);
1816  }
1817  }
1818  break;
1819  case PACKET_HEARTBEAT:
1820  {
1821  fprintf(agent->get_debug_fd(), "[HEARTBEAT] %u %s %hu %u %u", node_id, &buf[PACKET_HEADER_OFFSET_NODE_NAME], buf[PACKET_HEARTBEAT_OFFSET_BEAT_PERIOD]
1824  break;
1825  }
1826  case PACKET_MESSAGE:
1827  {
1828  fprintf(agent->get_debug_fd(), "[MESSAGE] %u %hu %s", node_id, buf[PACKET_MESSAGE_OFFSET_LENGTH], &buf[PACKET_MESSAGE_OFFSET_BYTES]);
1829  break;
1830  }
1831  case PACKET_COMMAND:
1832  {
1833  fprintf(agent->get_debug_fd(), "[COMMAND] %u %hu %s", node_id, buf[PACKET_COMMAND_OFFSET_LENGTH], &buf[PACKET_COMMAND_OFFSET_BYTES]);
1834  break;
1835  }
1836  }
1837  fprintf(agent->get_debug_fd(), "\n");
1838  fflush(agent->get_debug_fd());
1839  debug_fd_lock.unlock();
1840  }
1841 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
#define PACKET_METASHORT_OFFSET_TX_ID
Definition: transferlib.h:255
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
#define TRANSFER_MAX_FILENAME
Definition: transferlib.h:78
#define PACKET_QUEUE_OFFSET_TX_ID
Definition: transferlib.h:214
#define PACKET_IN
Definition: agent_file4.cpp:70
#define PACKET_HEARTBEAT_OFFSET_BEAT_PERIOD
Definition: transferlib.h:166
#define PACKET_HEARTBEAT_OFFSET_FUNIXTIME
Definition: transferlib.h:168
static uint32_t packet_in_count
Definition: agent_file4.cpp:149
int i
Definition: rw_test.cpp:37
#define PACKET_REQDATA_OFFSET_HOLE_END
Definition: transferlib.h:272
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
static Agent * agent
Definition: agent_file4.cpp:94
#define PACKET_DATA_OFFSET_TX_ID
Definition: transferlib.h:285
static uint32_t packet_out_count
Definition: agent_file4.cpp:150
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
ElapsedTime dt
Definition: agent_file4.cpp:186
#define PACKET_OUT
Definition: agent_file4.cpp:71
#define PACKET_COMPLETE_OFFSET_TX_ID
Definition: transferlib.h:298
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
string lookup_node_id_name(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:536
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
#define PACKET_CANCEL_OFFSET_TX_ID
Definition: transferlib.h:308
#define PACKET_DATA_OFFSET_BYTE_COUNT
Definition: transferlib.h:286
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define PACKET_REQDATA_OFFSET_TX_ID
Definition: transferlib.h:270
#define PACKET_MESSAGE_OFFSET_BYTES
Definition: transferlib.h:180
#define PACKET_REQMETA_OFFSET_TX_ID
Definition: transferlib.h:226
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
#define PACKET_DATA_OFFSET_CHUNK_START
Definition: transferlib.h:287
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
#define PACKET_COMMAND_OFFSET_LENGTH
Definition: transferlib.h:191
#define PACKET_MESSAGE_OFFSET_LENGTH
Definition: transferlib.h:179
static const unsigned char PACKET_MESSAGE
Definition: transferlib.h:112
Definition: elapsedtime.h:62
#define PACKET_HEADER_OFFSET_NODE_NAME
Definition: transferlib.h:153
#define PACKET_REQDATA_OFFSET_HOLE_START
Definition: transferlib.h:271
static const unsigned char PACKET_COMMAND
Definition: transferlib.h:113
#define PACKET_METASHORT_OFFSET_FILE_NAME
Definition: transferlib.h:257
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
char buf[128]
Definition: rw_test.cpp:40
#define PACKET_COMMAND_OFFSET_BYTES
Definition: transferlib.h:192
static const unsigned char PACKET_HEARTBEAT
Definition: transferlib.h:111
static string node
Definition: agent_monitor.cpp:126
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
#define PACKET_HEARTBEAT_OFFSET_THROUGHPUT
Definition: transferlib.h:167
static const unsigned char PACKET_REQQUEUE
Definition: transferlib.h:110
#define PACKET_HEADER_OFFSET_NODE_ID
Definition: transferlib.h:152
int32_t write_meta ( tx_progress tx,
double  interval = 5. 
)
1844 {
1845  vector<PACKET_BYTE> packet;
1846  std::ofstream file_name;
1847 
1848  if (currentmjd(0.) - tx.savetime > interval/86400.)
1849  {
1850  tx.savetime = currentmjd(0.);
1851  make_metadata_packet(packet, tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.node_name.c_str(), (char *)tx.agent_name.c_str());
1852  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1853  if(!file_name.is_open())
1854  {
1855  return (-errno);
1856  }
1857 
1858  uint16_t crc;
1859  file_name.write((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1860  crc = slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1861  file_name.write((char *)&crc, 2);
1862  for (file_progress progress_info : tx.file_info)
1863  {
1864  file_name.write((const char *)&progress_info, sizeof(progress_info));
1865  crc = slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info));
1866  file_name.write((char *)&crc, 2);
1867  }
1868  file_name.close();
1869  }
1870 
1871  return 0;
1872 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
deque< file_progress > file_info
Definition: transferlib.h:358
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
int32_t read_meta ( tx_progress tx)
1875 {
1876  vector<PACKET_BYTE> packet(PACKET_METALONG_OFFSET_TOTAL,0);
1877  std::ifstream file_name;
1879 
1880  struct stat statbuf;
1881  if (!stat((tx.temppath + ".meta").c_str(), &statbuf) && statbuf.st_size >= COSMOS_SIZEOF(file_progress))
1882  {
1883  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1884  if(!file_name.is_open())
1885  {
1886  return (-errno);
1887  }
1888  }
1889  else
1890  {
1891  // remove((tx.temppath + ".meta").c_str());
1892  return DATA_ERROR_SIZE_MISMATCH;
1893  }
1894 
1895  tx.fp = nullptr;
1896  tx.savetime = 0.;
1897  tx.complete = false;
1898 
1899 
1900  // load metadata
1901 
1902  file_name.read((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1903  if (file_name.eof())
1904  {
1905  return DATA_ERROR_SIZE_MISMATCH;
1906  }
1907  uint16_t crc;
1908  file_name.read((char *)&crc, 2);
1909  if (file_name.eof())
1910  {
1911  return DATA_ERROR_SIZE_MISMATCH;
1912  }
1913  if (crc != slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL))
1914  {
1915  file_name.close();
1916  return DATA_ERROR_CRC;
1917  }
1918  extract_metadata(packet, meta);
1919  tx.havemeta = true;
1920  tx.tx_id = meta.tx_id;
1921  tx.node_name = meta.node_name;
1922  tx.agent_name = meta.agent_name;
1923  tx.file_name = meta.file_name;
1924  tx.filepath = data_base_path(tx.node_name, "outgoing", tx.agent_name, tx.file_name);
1925  tx.file_size = meta.file_size;
1926 
1927  // load file progress
1928  file_progress progress_info;
1929  do
1930  {
1931  file_name.read((char *)&progress_info, sizeof(progress_info));
1932  if (file_name.eof())
1933  {
1934  break;
1935  }
1936  uint16_t crc;
1937  file_name.read((char *)&crc, 2);
1938  if (file_name.eof())
1939  {
1940  return DATA_ERROR_SIZE_MISMATCH;
1941  }
1942  if (crc != slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info)))
1943  {
1944  file_name.close();
1945  return DATA_ERROR_CRC;
1946  }
1947 
1948  tx.file_info.push_back(progress_info);
1949  } while(!file_name.eof());
1950  file_name.close();
1951  if (agent->debug_level)
1952  {
1953  debug_fd_lock.lock();
1954  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: read_meta: %s tx_id: %u chunks: %" PRIu32 "\n", tet.split(), dt.lap(), (tx.temppath + ".meta").c_str(), tx.tx_id, tx.file_info.size());
1955  fflush(agent->get_debug_fd());
1956  debug_fd_lock.unlock();
1957  }
1958 
1959  // fix any overlaps and count total bytes
1960  // merge_chunks_overlap(tx);
1961  // tx.state = STATE_REQDATA;
1962 
1963  return 0;
1964 }
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:231
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
char file_name[128]
Definition: transferlib.h:234
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:229
#define COSMOS_SIZEOF(element)
Definition: configCosmos.h:139
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
static Agent * agent
Definition: agent_file4.cpp:94
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
ElapsedTime dt
Definition: agent_file4.cpp:186
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
deque< file_progress > file_info
Definition: transferlib.h:358
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
#define DATA_ERROR_CRC
Definition: cosmos-errno.h:151
string node_name
Definition: transferlib.h:349
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:233
string filepath
Definition: transferlib.h:352
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:235
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
bool complete
Definition: transferlib.h:348
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:232
bool tx_progress_compare_by_size ( const tx_progress a,
const tx_progress b 
)
3159 {
3160  return a.file_size < b.file_size;
3161 }
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
bool filestruc_smaller_by_size ( const filestruc a,
const filestruc b 
)
3149 {
3150  return a.size < b.size;
3151 }
off_t size
Definition: datalib.h:120
bool filestruc_larger_by_size ( const filestruc a,
const filestruc b 
)
3154 {
3155  return a.size > b.size;
3156 }
off_t size
Definition: datalib.h:120
PACKET_TX_ID_TYPE check_tx_id ( tx_entry txentry,
PACKET_TX_ID_TYPE  tx_id 
)
3247 {
3248  if (tx_id != 0 && txentry.progress[tx_id].tx_id == tx_id)
3249  {
3250  return tx_id;
3251  }
3252  else
3253  {
3254  return 0;
3255  }
3256 }
std::vector< tx_progress > progress
Definition: agent_file.cpp:145
int32_t check_channel ( PACKET_NODE_ID_TYPE  node_id)
3259 {
3260  for(uint16_t i=0; i<out_comm_channel.size(); ++i)
3261  {
3262  if (out_comm_channel[i].node.size() && txq[node_id].node_name == out_comm_channel[i].node)
3263  {
3264  return i;
3265  }
3266  }
3267  return -1;
3268 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int i
Definition: rw_test.cpp:37
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
static string node
Definition: agent_monitor.cpp:126
int32_t add_node_name ( string  node_name)
3271 {
3272  uint8_t node_id;
3273 
3274  node_id = lookup_node_id(node_name);
3275  if (node_id == 0)
3276  {
3277  return GENERAL_ERROR_OUTOFRANGE;
3278  }
3279 
3280  if (txq[node_id].node_name == node_name)
3281  {
3282  return node_id;
3283  }
3284 
3285  txq[node_id].node_name = node_name;
3286  txq[node_id].incoming.size = 0;
3287  txq[node_id].outgoing.next_id = 1;
3288  txq[node_id].outgoing.size = 0;
3289  incoming_tx_purge(node_id);
3290  outgoing_tx_purge(node_id);
3291  return node_id;
3292 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
int32_t lookup_node_id(string node_name)
Definition: transferlib.cpp:514
int32_t incoming_tx_purge(uint8_t node, uint16_t tx_id=256)
Definition: agent_file4.cpp:3082
string node_name
Definition: agent_001.cpp:46
int32_t outgoing_tx_purge(uint8_t node, uint16_t tx_id=256)
Definition: agent_file4.cpp:2637
#define GENERAL_ERROR_OUTOFRANGE
Definition: cosmos-errno.h:296
PACKET_TX_ID_TYPE choose_incoming_tx_id ( uint8_t  node_id)
3164 {
3165  PACKET_TX_ID_TYPE tx_id = 0;
3166 
3167  if (node_id > 0 && node_id < txq.size())
3168  {
3169  // Choose file with least data left to send
3170  PACKET_FILE_SIZE_TYPE nsize = INT32_MAX;
3172  {
3173  // calculate bytes so far
3174  merge_chunks_overlap(txq[(node_id)].incoming.progress[i]);
3175 
3176  // Remove anything suspicious: file_size == 0, file_size < total_bytes
3177  if (txq[(node_id)].incoming.progress[i].havemeta && (txq[(node_id)].incoming.progress[i].file_size == 0 || txq[(node_id)].incoming.progress[i].file_size < txq[(node_id)].incoming.progress[i].total_bytes))
3178  {
3179  incoming_tx_del(node_id, txq[(node_id)].incoming.progress[i].tx_id);
3180  }
3181  // Choose transactions for which we: have meta and bytes remaining is minimized
3182  else if (txq[(node_id)].incoming.progress[i].tx_id && txq[(node_id)].incoming.progress[i].havemeta && (txq[(node_id)].incoming.progress[i].file_size - txq[(node_id)].incoming.progress[i].total_bytes) < nsize)
3183  {
3184  nsize = txq[(node_id)].incoming.progress[i].file_size - txq[(node_id)].incoming.progress[i].total_bytes;
3185  tx_id = txq[(node_id)].incoming.progress[i].tx_id;
3186  }
3187  }
3188  }
3189 
3190  return tx_id;
3191 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file4.cpp:1971
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file4.cpp:3000
int i
Definition: rw_test.cpp:37
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
PACKET_TX_ID_TYPE choose_outgoing_tx_id ( uint8_t  node_id)
3194 {
3195  PACKET_TX_ID_TYPE tx_id = 0;
3196 
3197  if (node_id > 0 && node_id < txq.size())
3198  {
3199  // Choose file with least data left to send
3200  PACKET_FILE_SIZE_TYPE nsize = INT32_MAX;
3202  {
3203  if (txq[(node_id)].outgoing.progress[i].tx_id && txq[(node_id)].outgoing.progress[i].enabled && txq[(node_id)].outgoing.progress[i].sentmeta && !txq[(node_id)].outgoing.progress[i].sentdata)
3204  {
3205  // Remove anything file_size == 0
3206  if (txq[(node_id)].outgoing.progress[i].file_size == 0)
3207  {
3208  outgoing_tx_del(node_id, txq[(node_id)].outgoing.progress[i].tx_id);
3209  }
3210  else
3211  {
3212  // calculate bytes so far
3213  merge_chunks_overlap(txq[(node_id)].outgoing.progress[i]);
3214 
3215  // Remove anything suspicious: file_size < total_bytes
3216  if (txq[(node_id)].outgoing.progress[i].file_size < txq[(node_id)].outgoing.progress[i].total_bytes)
3217  {
3218  outgoing_tx_del(node_id, txq[(node_id)].outgoing.progress[i].tx_id, false);
3219  }
3220  // Choose unfinished transactions for which bytes remaining is minimized
3221  else if (txq[(node_id)].outgoing.progress[i].total_bytes < nsize)
3222  {
3223  nsize = txq[(node_id)].outgoing.progress[i].total_bytes;
3224  tx_id = txq[(node_id)].outgoing.progress[i].tx_id;
3225  }
3226  }
3227  }
3228  }
3229  }
3230 
3231  if (tx_id)
3232  {
3233 // vector<file_progress> togo;
3234 // togo = find_chunks_togo(txq[(node_id)].outgoing.progress[tx_id]);
3235 // for (file_progress missed : togo)
3236 // {
3237 // txq[(node_id)].outgoing.progress[tx_id].file_info.push_back(missed);
3238 // }
3239 
3240  merge_chunks_overlap(txq[(node_id)].outgoing.progress[tx_id]);
3241  txq[(node_id)].outgoing.progress[tx_id].senddata = true;
3242  }
3243  return tx_id;
3244 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file4.cpp:1971
int i
Definition: rw_test.cpp:37
int32_t outgoing_tx_del(uint8_t node, uint16_t tx_id=256, bool remove_file=true)
Definition: agent_file4.cpp:2564
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
int32_t next_incoming_tx ( PACKET_NODE_ID_TYPE  node,
int32_t  use_channel 
)
3295 {
3297 
3298  if (tx_id < PROGRESS_QUEUE_SIZE && tx_id > 0)
3299  {
3300  // See if we know what the remote node_id is for this
3301  if (txq[node].node_id > 0)
3302  {
3303  // Check if file has been completely received
3304  if(txq[(node)].incoming.progress[tx_id].file_size == txq[(node)].incoming.progress[tx_id].total_bytes && txq[(node)].incoming.progress[tx_id].havemeta)
3305  {
3306  // tx_progress tx_in = txq[(node)].incoming.progress[tx_id];
3307  if (agent->debug_level)
3308  {
3309  debug_fd_lock.lock();
3310  fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming(next_incoming_tx): Complete: %u %s %u %u\n", tet.split(), dt.lap(), txq[(node)].incoming.progress[tx_id].tx_id, txq[(node)].incoming.progress[tx_id].node_name.c_str(), txq[(node)].incoming.progress[tx_id].file_size, txq[(node)].incoming.progress[tx_id].total_bytes);
3311  fflush(agent->get_debug_fd());
3312  debug_fd_lock.unlock();
3313  }
3314 
3315  // Move file to its final location
3316  if (!txq[(node)].incoming.progress[tx_id].complete)
3317  {
3318  if (txq[(node)].incoming.progress[tx_id].fp != nullptr)
3319  {
3320  fclose(txq[(node)].incoming.progress[tx_id].fp);
3321  txq[(node)].incoming.progress[tx_id].fp = nullptr;
3322  }
3323  string final_filepath = txq[(node)].incoming.progress[tx_id].temppath + ".file";
3324  int32_t iret = rename(final_filepath.c_str(), txq[(node)].incoming.progress[tx_id].filepath.c_str());
3325  // Make sure metadata is recorded
3326  write_meta(txq[(node)].incoming.progress[tx_id], 0.);
3327  if (agent->debug_level)
3328  {
3329  debug_fd_lock.lock();
3330  fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming(next_incoming_tx): Renamed: %d %s\n", tet.split(), dt.lap(), iret, txq[(node)].incoming.progress[tx_id].filepath.c_str());
3331  fflush(agent->get_debug_fd());
3332  debug_fd_lock.unlock();
3333  }
3334  txq[(node)].incoming.progress[tx_id].complete = true;
3335  }
3336  }
3337  else
3338  {
3339  if (agent->debug_level)
3340  {
3341  debug_fd_lock.lock();
3342  fprintf(agent->get_debug_fd(), "%.4f %.4f Incoming(next_incoming_tx): More: %u %s %u %u\n", tet.split(), dt.lap(), txq[(node)].incoming.progress[tx_id].tx_id, txq[(node)].incoming.progress[tx_id].node_name.c_str(), txq[(node)].incoming.progress[tx_id].file_size, txq[(node)].incoming.progress[tx_id].total_bytes);
3343  fflush(agent->get_debug_fd());
3344  debug_fd_lock.unlock();
3345  }
3346  }
3347  }
3348  }
3349  return tx_id;
3350 }
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file4.cpp:3246
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file4.cpp:1843
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_TX_ID_TYPE choose_incoming_tx_id(uint8_t node_id)
Definition: agent_file4.cpp:3163
static Agent * agent
Definition: agent_file4.cpp:94
ElapsedTime dt
Definition: agent_file4.cpp:186
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
static string node
Definition: agent_monitor.cpp:126
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
string json_list_incoming ( )
3497  {
3498  JSONObject jobj;
3499  JSONArray incoming;
3500 
3501  incoming.resize(txq.size());
3502  for (uint16_t node=0; node<txq.size(); ++node)
3503  {
3504 
3505  JSONObject node_obj("node", txq[node].node_name);
3506  node_obj.addElement("count", txq[node].incoming.size);
3507 
3508  JSONArray files;
3509  files.resize(txq[node].incoming.size);
3510  int i =0;
3511  for(tx_progress tx : txq[node].incoming.progress)
3512  {
3513  if (tx.tx_id)
3514  {
3515  JSONObject f("tx_id", tx.tx_id);
3516  f.addElement("agent", tx.agent_name);
3517  f.addElement("name", tx.file_name);
3518  f.addElement("bytes", tx.total_bytes);
3519  f.addElement("size", tx.file_size);
3520  files.at(i) = (JSONValue(f));
3521  i++;
3522  }
3523  }
3524  node_obj.addElement("files", files);
3525  incoming.at(node) = JSONValue(node_obj);
3526 
3527  }
3528  jobj.addElement("incoming", incoming);
3529  return jobj.to_json_string();
3530 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
string json_list_outgoing ( )
3532  {
3533  JSONObject jobj;
3534  JSONArray outgoing;
3535 
3536 // outgoing.resize(txq.size());
3537  for (uint16_t node=0; node<txq.size(); ++node)
3538  {
3539 
3540  JSONObject node_obj("node", txq[node].node_name);
3541  node_obj.addElement("count", txq[node].outgoing.size);
3542 
3543  JSONArray files;
3544 // files.resize(txq[node].outgoing.size);
3545 // int i =0;
3546  for(tx_progress tx : txq[node].outgoing.progress)
3547  {
3548  if (tx.tx_id)
3549  {
3550  JSONObject f("tx_id", tx.tx_id);
3551  f.addElement("agent", tx.agent_name);
3552  f.addElement("name", tx.file_name);
3553  f.addElement("bytes", tx.total_bytes);
3554  f.addElement("size", tx.file_size);
3555  files.push_back(JSONValue(f));
3556 // files.at(i) = (JSONValue(f));
3557 // i++;
3558  }
3559  }
3560  node_obj.addElement("files", files);
3561 // outgoing.at(node) = JSONValue(node_obj);
3562  outgoing.push_back(node_obj);
3563 
3564  }
3565  jobj.addElement("outgoing", outgoing);
3566  return jobj.to_json_string();
3567 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
Definition: jsonvalue.h:13
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
string json_list_queue ( )
3569 {
3570  JSONObject jobj;
3571  JSONArray incoming;
3572  JSONArray outgoing;
3573 
3574  outgoing.resize(txq.size());
3575  incoming.resize(txq.size());
3576  for (uint16_t node=0; node<txq.size(); ++node)
3577  {
3578 
3579  JSONObject node_in("node", txq[node].node_name);
3580  node_in.addElement("count", txq[node].incoming.size);
3581 
3582  JSONArray ifiles;
3583  // ifiles.resize(txq[node].incoming.size);
3584  int i =0;
3585  for(tx_progress tx : txq[node].incoming.progress)
3586  {
3587  if (tx.tx_id)
3588  {
3589  JSONObject f("tx_id", tx.tx_id);
3590  f.addElement("agent", tx.agent_name);
3591  f.addElement("name", tx.file_name);
3592  f.addElement("bytes", tx.total_bytes);
3593  f.addElement("size", tx.file_size);
3594  ifiles.push_back(JSONValue(f));
3595  ifiles.at(i) = (JSONValue(f));
3596  // i++;
3597  }
3598  }
3599  node_in.addElement("files", ifiles);
3600  incoming.at(node) = JSONValue(node_in);
3601 
3602  JSONObject node_out("node", txq[node].node_name);
3603  node_out.addElement("count", txq[node].incoming.size);
3604  JSONArray files;
3605  // files.resize(txq[node].outgoing.size);
3606  for(tx_progress tx : txq[node].outgoing.progress)
3607  {
3608  if (tx.tx_id)
3609  {
3610  JSONObject f("tx_id", tx.tx_id);
3611  f.addElement("agent", tx.agent_name);
3612  f.addElement("name", tx.file_name);
3613  f.addElement("bytes", tx.total_bytes);
3614  f.addElement("size", tx.file_size);
3615  files.push_back(JSONValue(f));
3616  // files.at(i) = (JSONValue(f));
3617  // i++;
3618  }
3619  }
3620  node_out.addElement("files", files);
3621  outgoing.at(node) = JSONValue(node_out);
3622 
3623  }
3624  jobj.addElement("outgoing", outgoing);
3625  jobj.addElement("incoming", incoming);
3626  return jobj.to_json_string();
3627 }
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
void write_queue_log ( double  logdate)
3476 {
3477  string record = json_list_queue(); // to append to file
3478 
3479  log_write(agent->cinfo->node.name, "file", logdate, "", "log", record, log_directory);
3480  log_move(data_type_path(agent->cinfo->node.name, log_directory, "file", logdate, "log"), data_type_path(agent->cinfo->node.name, "outgoing", "file", logdate, "log"), true);
3481 
3482 
3483 }
void log_move(string oldpath, string newpath, bool compress)
Move log file - path version.
Definition: datalib.cpp:200
static Agent * agent
Definition: agent_file4.cpp:94
nodestruc node
Structure for summary information in node.
Definition: jsondef.h:4220
string data_type_path(string node, string location, string agent, double mjd, string type)
Create data file path.
Definition: datalib.cpp:910
char name[40+1]
Node Name.
Definition: jsondef.h:3556
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
cosmosstruc * cinfo
Definition: agentclass.h:346
static string log_directory
Definition: agent_file4.cpp:184
string json_list_queue()
Definition: agent_file4.cpp:3568
int main ( int  argc,
char *  argv[] 
)
239 {
240  int32_t iretn;
241  thread send_loop_thread;
242  thread recv_loop_thread;
243  thread transmit_loop_thread;
244 
245  if (static_cast<string>(argv[0]).find("slow") != string::npos)
246  {
249  }
250 
251  agent = new Agent("", "file", 0.);
252  if ((iretn = agent->wait()) < 0)
253  {
254  fprintf(agent->get_debug_fd(), "%.4f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
255  exit(iretn);
256  }
257  else
258  {
259  fprintf(agent->get_debug_fd(), "%.4f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
260  }
261 
262  fprintf(agent->get_debug_fd(), "%.4f Node: %s Agent: %s - Established\n", tet.split(), agent->nodeName.c_str(), agent->agentName.c_str());
263  fflush(agent->get_debug_fd()); // Ensure this gets printed before blocking call
264 
265  out_comm_channel.resize(1);
266  if((iretn = socket_open(&out_comm_channel[0].chansock, NetworkType::UDP, "", AGENTRECVPORT, SOCKET_LISTEN, SOCKET_BLOCKING, 5000000)) < 0)
267  {
268  fprintf(agent->get_debug_fd(), "%.4f Main: Node: %s Agent: %s - Listening socket failure\n", tet.split(), agent->nodeName.c_str(), agent->agentName.c_str());
269  agent->shutdown();
270  exit (-errno);
271  }
272 
273  inet_ntop(out_comm_channel[0].chansock.caddr.sin_family, &out_comm_channel[0].chansock.caddr.sin_addr, out_comm_channel[0].chansock.address, sizeof(out_comm_channel[0].chansock.address));
274  out_comm_channel[0].chanip = out_comm_channel[0].chansock.address;
275  out_comm_channel[0].nmjd = currentmjd();
276  out_comm_channel[0].limjd = out_comm_channel[0].nmjd;
277  out_comm_channel[0].lomjd = out_comm_channel[0].nmjd;
278  out_comm_channel[0].fmjd = out_comm_channel[0].nmjd;
279  out_comm_channel[0].node = "";
280  out_comm_channel[0].throughput = default_throughput;
281  out_comm_channel[0].packet_size = default_packet_size;
282  fprintf(agent->get_debug_fd(), "%.4f Node: %s Agent: %s - Listening socket open\n", tet.split(), agent->nodeName.c_str(), agent->agentName.c_str());
283  fflush(agent->get_debug_fd()); // Ensure this gets printed before blocking call
284 
285  if (argc > 1 && ((argv[1][0] < '0' || argv[1][0] > '9') || argc == 3))
286  {
287  out_comm_channel.resize(2);
288  out_comm_channel[1].node = argv[argc-1];
289  out_comm_channel[1].throughput = default_throughput;
290  vector <string> cargs = string_split(argv[argc-1], ":");
291  switch (cargs.size())
292  {
293  case 3:
294  out_comm_channel[1].throughput = stol(cargs[2]);
295  case 2:
296  out_comm_channel[1].chanip = cargs[1];
297  default:
298  out_comm_channel[1].node = cargs[0];
299  }
300 // size_t tloc = out_comm_channel[1].node.find(':');
301 // if (tloc != string::npos)
302 // {
303 // out_comm_channel[1].chanip = out_comm_channel[1].node.substr(tloc+1, out_comm_channel[1].node.size()-tloc+1);
304 // out_comm_channel[1].node = out_comm_channel[1].node.substr(0, tloc);
305 // }
307  {
308  fprintf(agent->get_debug_fd(), "%.4f Node: %s IP: %s - Sending socket failure\n", tet.split(), out_comm_channel[1].node.c_str(), out_comm_channel[1].chanip.c_str());
309  agent->shutdown();
310  exit (-errno);
311  }
312  out_comm_channel[1].nmjd = currentmjd();
313  out_comm_channel[1].limjd = out_comm_channel[1].nmjd;
314  out_comm_channel[1].lomjd = out_comm_channel[1].nmjd;
315  out_comm_channel[1].fmjd = out_comm_channel[1].nmjd;
316  out_comm_channel[1].packet_size = default_packet_size;
317  fprintf(agent->get_debug_fd(), "%.4f Network: Old: %u %s %s %u\n", tet.split(), 1, out_comm_channel[1].node.c_str(), out_comm_channel[1].chanip.c_str(), ntohs(out_comm_channel[1].chansock.caddr.sin_port));
318  fflush(agent->get_debug_fd());
319 
320  log_directory = "temp"; // put log files in node/outgoing/file
321  logstride_sec = 600.; // longer logstride
322  }
323 
324  // Initialize Transfer Queue
325  if ((iretn = load_nodeids()) < 2)
326  {
327  fprintf(agent->get_debug_fd(), "%.4f Couldn't load node lookup table\n", tet.split());
328  agent->shutdown();
329  exit (iretn);
330  }
331  txq.resize(iretn);
332  for (uint16_t i=1; i<iretn; ++i)
333  {
334  if (check_node_id(i) > 0)
335  {
336  txq[i].node_id = i;
337  txq[i].node_name = lookup_node_id_name(i);
338  }
339  }
340 
341  if (argc > 1 && (argv[1][0] >= '0' && argv[1][0] <= '9'))
342  {
343  agent->debug_level = argv[1][0] - '0';
344  }
345  else
346  {
347  agent->debug_level = 0;
348  }
349 
350  // Restore in progress transfers from previous run
351  for (string node_name : data_list_nodes())
352  {
353 // int32_t node_id = add_node_name(node_name);
354 
355  for(filestruc file : data_list_files(node_name, "temp", "file"))
356  {
357  // Add entry for each meta file
358  if (file.type == "meta")
359  {
360  // Incoming
361  if (!file.name.compare(0,3,"in_"))
362  {
363  tx_progress tx_in;
364  tx_in.temppath = file.path.substr(0,file.path.find(".meta"));
365  if (read_meta(tx_in) >= 0)
366  {
367  merge_chunks_overlap(tx_in);
368  iretn = incoming_tx_add(tx_in);
369  }
370  }
371 
372  // Outgoing
373  if (!file.name.compare(0,4,"out_"))
374  {
375  tx_progress tx_out;
376  tx_out.temppath = file.path.substr(0,file.path.find(".meta"));
377  if (read_meta(tx_out) >= 0)
378  {
379  find_chunks_togo(tx_out);
380  iretn = outgoing_tx_add(tx_out);
381  }
382  }
383  }
384  }
385  }
386 
387  // add agent_file requests
388  if ((iretn=agent->add_request("get_channels",request_get_channels,"", "get channel information")))
389  exit (iretn);
390  if ((iretn=agent->add_request("set_throughput",request_set_throughput,"{n} [throughput]", "set channel throughput")))
391  exit (iretn);
392  if ((iretn=agent->add_request("remove_file",request_remove_file,"in|out tx_id", "removes file from indicated queue")))
393  exit (iretn);
394  // if ((iretn=agent->add_request("send_file",request_send_file,"", "creates and sends metadata/data packets")))
395  // exit (iretn);
396  if ((iretn=agent->add_request("ls",request_ls,"", "lists contents of directory")))
397  exit (iretn);
398  if ((iretn=agent->add_request("list_incoming",request_list_incoming,"", "lists contents incoming queue")))
399  exit (iretn);
400  if ((iretn=agent->add_request("list_outgoing",request_list_outgoing,"", "lists contents outgoing queue")))
401  exit (iretn);
402  if ((iretn=agent->add_request("list_incoming_json",request_list_incoming_json,"", "lists contents incoming queue")))
403  exit (iretn);
404  if ((iretn=agent->add_request("list_outgoing_json",request_list_outgoing_json,"", "lists contents outgoing queue")))
405  exit (iretn);
406  if ((iretn=agent->add_request("set_logstride",request_set_logstride,"sec","set time interval of log files")))
407  exit (iretn);
408  if ((iretn=agent->add_request("get_logstride",request_get_logstride,"","get time interval of log files")))
409  exit (iretn);
410  if ((iretn=agent->add_request("debug",request_debug,"{0|1}","Toggle Debug information")))
411  exit (iretn);
412  if ((iretn=agent->add_request("command",request_send_command,"nodename command","Submit command in event format to agent_file on requested Node")))
413  exit (iretn);
414  if ((iretn=agent->add_request("message",request_send_message,"nodename message","Send message to agent_file on requested Node")))
415  exit (iretn);
416 
417  double nextdiskcheck = currentmjd(0.);
418  for (uint16_t node=0; node<txq.size(); ++node)
419  {
420  iretn = outgoing_tx_load(node);
421  if (iretn >= 0)
422  {
423  nextdiskcheck = currentmjd();
424  }
425  }
426 
427  send_loop_thread = thread([=] { send_loop(); });
428  recv_loop_thread = thread([=] { recv_loop(); });
429  transmit_loop_thread = thread([=] { transmit_loop(); });
430 
431  double nextlog = currentmjd();
432  double sleepsec;
433  ElapsedTime etloop;
434  etloop.start();
435 
436  // start the agent
437  while(agent->running())
438  {
439  if (agent->running() == (uint16_t)Agent::State::IDLE)
440  {
441  COSMOS_SLEEP(1);
442  continue;
443  }
444 
445 
446  if(nextdiskcheck < nextlog ) {
447  sleepsec = 86400. * (nextdiskcheck - currentmjd());
448  } else {
449  sleepsec = 86400. * (nextlog - currentmjd());
450  }
451  if (sleepsec > 0.)
452  {
453  COSMOS_SLEEP((sleepsec));
454  }
455 
456  if(currentmjd() > nextlog) {
458  nextlog = currentmjd(0.) + logstride_sec/86400.;
459  }
460 
461  // Check for new files to transmit if queue is not full and check is not delayed
462 
463 
464  if (currentmjd() > nextdiskcheck)
465  {
466 
467  nextdiskcheck = currentmjd(0.) + 10./86400.;
468  for (uint16_t node=0; node<txq.size(); ++node)
469  {
470  iretn = outgoing_tx_load(node);
471 // if (iretn >= 0)
472 // {
473 // nextdiskcheck = currentmjd();
474 // }
475  }
476  }
477  } // End WHILE Loop
478 
479  if (agent->debug_level)
480  {
481  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: Node: %s Agent: %s - Exiting\n", tet.split(), dt.lap(), agent->nodeName.c_str(), agent->agentName.c_str());
482  fflush(agent->get_debug_fd());
483  }
484 
485  send_loop_thread.join();
486  recv_loop_thread.join();
487  // transmit_queue_check.notify_one();
488  transmit_loop_thread.join();
489  txq.clear();
490 
491  if (agent->debug_level)
492  {
493  fprintf(agent->get_debug_fd(), "%.4f %.4f Main: Node: %s Agent: %s - Shutting down\n", tet.split(), dt.lap(), agent->nodeName.c_str(), agent->agentName.c_str());
494  fflush(agent->get_debug_fd());
495  }
496 
497  agent->shutdown();
498 
499  exit (0);
500 }
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file4.cpp:2811
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
int32_t request_send_message(string &request, string &response, Agent *)
Definition: agent_file4.cpp:3408
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
int32_t request_list_outgoing_json(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:3491
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file4.cpp:1971
Agent socket using Unicast UDP.
int32_t load_nodeids()
Definition: transferlib.cpp:549
int32_t request_set_logstride(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:3458
void send_loop() noexcept
Definition: agent_file4.cpp:1107
string nodeName
Definition: agentclass.h:367
int i
Definition: rw_test.cpp:37
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
Definition: transferlib.h:338
int iretn
Definition: rw_test.cpp:37
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
static Agent * agent
Definition: agent_file4.cpp:94
vector< filestruc > data_list_files(string directory)
Get list of files in a directory, directly.
Definition: datalib.cpp:461
int32_t outgoing_tx_load(uint8_t node)
Definition: agent_file4.cpp:2703
ElapsedTime dt
Definition: agent_file4.cpp:186
double logstride_sec
Definition: agent_file4.cpp:185
vector< string > data_list_nodes()
Get list of Nodes, directly.
Definition: datalib.cpp:583
string node_name
Definition: agent_001.cpp:46
#define SOCKET_TALK
Talk followed by optional listen (sendto address)
Definition: socketlib.h:82
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
#define THROUGHPUT_LO
Definition: agent_file4.cpp:66
static PACKET_CHUNK_SIZE_TYPE default_packet_size
Definition: agent_file4.cpp:114
string lookup_node_id_name(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:536
static ElapsedTime tet
Definition: agent_file4.cpp:137
Definition: datalib.h:113
void start()
ElapsedTime::start.
Definition: elapsedtime.cpp:203
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
#define AGENTRECVPORT
Default RECV port.
Definition: agentclass.h:200
int32_t outgoing_tx_add(tx_progress &tx_out)
Definition: agent_file4.cpp:2274
string getAgent()
Definition: agentclass.cpp:2609
void recv_loop() noexcept
Definition: agent_file4.cpp:502
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
int32_t add_request(string token, external_request_function function, string synopsis="", string description="")
Add internal request to Agent request list with description and synopsis.
Definition: agentclass.cpp:312
int32_t request_debug(string &request, string &, Agent *)
Definition: agent_file4.cpp:3446
Definition: agentclass.h:139
#define SOCKET_BLOCKING
Blocking Agent.
Definition: socketlib.h:78
int32_t shutdown()
Shutdown agent gracefully.
Definition: agentclass.cpp:366
#define AGENTRCVTIMEO
Default AGENT socket RCVTIMEO (100 msec)
Definition: agentclass.h:208
double data_ctime(string path)
Definition: datalib.cpp:1910
vector< file_progress > find_chunks_togo(tx_progress &tx)
Definition: agent_file4.cpp:2080
Definition: elapsedtime.h:62
int32_t request_list_outgoing(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:2187
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
int32_t request_get_logstride(string &, string &response, Agent *)
Definition: agent_file4.cpp:3469
int32_t request_list_incoming(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:2153
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
void write_queue_log(double logdate)
Definition: agent_file4.cpp:3475
#define SOCKET_LISTEN
Listen followed by optional talk (recvfrom INADDRANY)
Definition: socketlib.h:84
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
int32_t request_get_channels(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:2212
#define PACKET_SIZE_LO
Definition: agent_file4.cpp:64
int32_t read_meta(tx_progress &tx)
Definition: agent_file4.cpp:1874
static string log_directory
Definition: agent_file4.cpp:184
int32_t request_remove_file(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:2253
int32_t request_send_command(string &request, string &response, Agent *)
Definition: agent_file4.cpp:3353
int32_t socket_open(socket_channel *channel, NetworkType ntype, const char *address, uint16_t port, uint16_t role, bool blocking, uint32_t usectimeo, uint32_t rcvbuf, uint32_t sndbuf)
Open UDP socket.
Definition: socketlib.cpp:51
static string node
Definition: agent_monitor.cpp:126
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t request_list_incoming_json(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:3485
static uint32_t default_throughput
Definition: agent_file4.cpp:115
int32_t request_set_throughput(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:2232
string agentName
Definition: agentclass.h:368
int32_t request_ls(string &request, string &response, Agent *agent)
Definition: agent_file4.cpp:2123
vector< string > string_split(string in, string delimeters)
Parse a string.
Definition: stringlib.cpp:47
void transmit_loop() noexcept
Definition: agent_file4.cpp:1415
bool lower_chunk ( file_progress  i,
file_progress  j 
)
1967 {
1968  return (i.chunk_start<j.chunk_start);
1969 }
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334

Variable Documentation

beatstruc cbeat
static

the (global) name of the heartbeat structure

Agent* agent
static

the (global) name of the cosmos data structure

PACKET_CHUNK_SIZE_TYPE default_packet_size = (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
static
uint32_t default_throughput = 1000
static
vector<channelstruc> out_comm_channel
static
std::queue<transmit_queue_entry> transmit_queue
static
ElapsedTime tet
static
std::mutex txqueue_lock
static
std::mutex incoming_tx_lock
static
std::mutex outgoing_tx_lock
static
std::mutex debug_fd_lock
static
double last_data_receive_time = 0.
static
uint32_t packet_in_count = 0
static
uint32_t packet_out_count
static
uint32_t crc_error_count = 0
static
uint32_t timeout_error_count = 0
static
uint32_t type_error_count = 0
static
uint32_t send_error_count = 0
static
uint32_t recv_error_count = 0
static
vector<tx_queue> txq
static
string log_directory = "temp"
static
double logstride_sec = 10.