COSMOS core  1.0.2 (beta)
Comprehensive Open-architecture Solution for Mission Operations Systems
agent_file.cpp File Reference

File Transfer Agent source file. More...

#include "support/configCosmos.h"
#include "agent/agentclass.h"
#include "support/jsonlib.h"
#include "support/transferlib.h"
#include "support/sliplib.h"
#include <algorithm>
#include <cstring>
#include <time.h>
#include <iostream>
#include <fstream>
#include <string>
#include <sys/stat.h>
Include dependency graph for agent_file.cpp:

Classes

struct  sendchannelstruc
 
struct  transmit_queue_entry
 
struct  tx_entry
 
struct  tx_queue
 

Macros

#define TRANSFER_QUEUE_SIZE   256
 
#define MAXBUFFERSIZE   1024
 
#define PACKET_SIZE_LO   (253-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
 
#define THROUGHPUT_LO   1000
 
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
 
#define THROUGHPUT_HI   150000
 

Functions

void send_loop ()
 
void recv_loop ()
 
void transmit_loop ()
 
int32_t request_debug (string &request, string &response, Agent *agent)
 
int32_t request_use_channel (string &request, string &response, Agent *agent)
 
int32_t request_remove_file (string &request, string &response, Agent *agent)
 
int32_t request_ls (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming (string &request, string &response, Agent *agent)
 
int32_t request_list_incoming_json (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing (string &request, string &response, Agent *agent)
 
int32_t request_list_outgoing_json (string &request, string &response, Agent *agent)
 
int32_t outgoing_tx_add (tx_progress tx_out)
 
int32_t outgoing_tx_add (std::string node_name, std::string agent_name, std::string file_name)
 
int32_t outgoing_tx_del (int32_t node, PACKET_TX_ID_TYPE tx_id)
 
int32_t incoming_tx_add (tx_progress tx_in)
 
int32_t incoming_tx_add (std::string node_name, PACKET_TX_ID_TYPE tx_id)
 
int32_t incoming_tx_update (packet_struct_metashort meta)
 
int32_t incoming_tx_del (int32_t node, PACKET_TX_ID_TYPE tx_id)
 
std::vector< file_progressfind_chunks_missing (tx_progress &tx)
 
PACKET_FILE_SIZE_TYPE merge_chunks_overlap (tx_progress &tx)
 
double queuesendto (std::string type, uint16_t channel, std::vector< PACKET_BYTE > packet)
 
int32_t mysendto (std::string type, sendchannelstruc &channel, std::vector< PACKET_BYTE > &buf)
 
int32_t myrecvfrom (std::string type, socket_channel channel, std::vector< PACKET_BYTE > &buf, uint32_t length)
 
void debug_packet (std::vector< PACKET_BYTE > buf, std::string type)
 
int32_t write_meta (tx_progress &tx)
 
int32_t read_meta (tx_progress &tx)
 
bool tx_progress_compare_by_size (const tx_progress &a, const tx_progress &b)
 
bool filestruc_compare_by_size (const filestruc &a, const filestruc &b)
 
PACKET_TX_ID_TYPE check_tx_id (std::vector< tx_progress > tx_entry, PACKET_TX_ID_TYPE tx_id)
 
int32_t check_node_id_1 (std::string node_name)
 
int32_t check_node_id_1 (PACKET_NODE_ID_TYPE node_id)
 
int32_t lookup_remote_node_id (PACKET_NODE_ID_TYPE node_id)
 
int32_t set_remote_node_id (PACKET_NODE_ID_TYPE node_id, std::string node_name)
 
PACKET_TX_ID_TYPE choose_incoming_tx_id (int32_t node)
 
int32_t next_incoming_tx (PACKET_NODE_ID_TYPE node)
 
string json_list_incoming ()
 
string json_list_outgoing ()
 
int main (int argc, char *argv[])
 
bool lower_chunk (file_progress i, file_progress j)
 

Variables

static bool debug_flag = false
 
static std::string agentname = "file_"
 
static beatstruc cbeat
 
static Agentagent
 
static uint16_t send_channels =0
 
static uint16_t use_channel = 0
 
static sendchannelstruc send_channel [2]
 
static std::queue< transmit_queue_entrytransmit_queue
 
static std::condition_variable transmit_queue_check
 
static socket_channel recvchan
 
static std::mutex incoming_tx_lock
 
static std::mutex outgoing_tx_lock
 
static double last_data_receive_time = 0.
 
static double next_reqmeta_time = 0.
 
static double next_queue_time = 0.
 
static std::vector< tx_queuetxq
 
static int32_t active_node = -1
 

Detailed Description

File Transfer Agent source file.

Macro Definition Documentation

#define TRANSFER_QUEUE_SIZE   256
#define MAXBUFFERSIZE   1024
#define PACKET_SIZE_LO   (253-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define PACKET_SIZE_PAYLOAD   (PACKET_SIZE_LO-PACKET_DATA_OFFSET_HEADER_TOTAL)
#define THROUGHPUT_LO   1000
#define PACKET_SIZE_HI   (1472-(PACKET_DATA_OFFSET_HEADER_TOTAL+28))
#define THROUGHPUT_HI   150000

Function Documentation

void send_loop ( )
noexcept
992 {
993  std::vector<PACKET_BYTE> packet;
994  uint32_t sleep_time = 1;
995  double send_time = 0.;
996  double next_data_time = 0.;
997  double current_time;
998 
999  current_time = currentmjd();
1000 
1001  while (agent->running())
1002  {
1003  if (agent->running() == (uint16_t)Agent::State::IDLE)
1004  {
1005  COSMOS_SLEEP(1);
1006  continue;
1007  }
1008 
1009  // If we did nothing last loop, wait at least 100 msec
1010  if (next_data_time == 0.)
1011  {
1012  // 100 msec in MJD
1013  next_data_time = 1.16e-6;
1014  }
1015 
1016  // Time it should be after we wait
1017  double next_time = current_time + next_data_time;
1018  // Time it actually is now
1019  current_time = currentmjd();
1020  // Sleep if the difference is greater than zero
1021  if (next_time > current_time)
1022  {
1023  sleep_time = 1000000 * 86400. * (next_time - current_time);
1024  COSMOS_USLEEP(sleep_time);
1025  }
1026 
1027  // Bring us up to the present
1028  current_time = next_time;
1029  next_data_time = 0.;
1030 
1031  outgoing_tx_lock.lock();
1032  if (active_node >= 0)
1033  {
1034  int32_t node = active_node;
1035  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].outgoing.progress, txq[node].outgoing.id);
1036  if (tx_id > 0)
1037  {
1038  if (txq[node].outgoing.progress[tx_id].file_info.size())
1039  {
1040  if (txq[node].outgoing.progress[tx_id].fp == nullptr)
1041  {
1042  txq[node].outgoing.progress[tx_id].fp = fopen(txq[node].outgoing.progress[tx_id].filepath.c_str(), "r");
1043  }
1044 
1045  if(txq[node].outgoing.progress[tx_id].fp != nullptr)
1046  {
1047  file_progress tp;
1048  tp = txq[node].outgoing.progress[tx_id].file_info[0];
1049 
1050  PACKET_FILE_SIZE_TYPE byte_count = (tp.chunk_end - tp.chunk_start) + 1;
1051  switch (use_channel)
1052  {
1053  case 0:
1054  if (byte_count > PACKET_SIZE_LO)
1055  {
1056  byte_count = PACKET_SIZE_LO;
1057  }
1058  break;
1059  case 1:
1060  if (byte_count > PACKET_SIZE_HI)
1061  {
1062  byte_count = PACKET_SIZE_HI;
1063  }
1064  break;
1065  }
1066 
1067  tp.chunk_end = tp.chunk_start + byte_count - 1;
1068 
1069  // Read the packet and send it
1070  int32_t nbytes;
1071  PACKET_BYTE* chunk = new PACKET_BYTE[byte_count]();
1072  if (!(nbytes = fseek(txq[node].outgoing.progress[tx_id].fp, tp.chunk_start, SEEK_SET)))
1073  {
1074  nbytes = fread(chunk, 1, byte_count, txq[node].outgoing.progress[tx_id].fp);
1075  }
1076  if (nbytes == byte_count)
1077  {
1078  // See if we know what the remote node_id is for this
1079  int32_t remote_node = lookup_remote_node_id(node);
1080  if (remote_node >= 0)
1081  {
1082  make_data_packet(packet, remote_node, txq[node].outgoing.progress[tx_id].tx_id, byte_count, tp.chunk_start, chunk);
1083 
1084  send_time = queuesendto("tx", use_channel, packet);
1085  next_data_time += send_time;
1086  txq[node].outgoing.progress[tx_id].file_info[0].chunk_start = tp.chunk_end + 1;
1087  }
1088  }
1089  else
1090  {
1091  // Some problem with this transmission, ask other end to dequeue it
1092  // Remove transaction
1093  outgoing_tx_del(node, tx_id);
1094 
1095  int32_t remote_node = lookup_remote_node_id(node);
1096  if (remote_node >= 0)
1097  {
1098  // Send a CANCEL packet
1099  std::vector<PACKET_BYTE> packet;
1100  make_cancel_packet(packet, remote_node, tx_id);
1101  queuesendto("tx", use_channel, packet);
1102  }
1103  }
1104  delete[] chunk;
1105 
1106  if (txq[node].outgoing.progress[tx_id].file_info[0].chunk_start > txq[node].outgoing.progress[tx_id].file_info[0].chunk_end)
1107  {
1108  // All done with this file_info entry. Close file and remove entry.
1109  fclose(txq[node].outgoing.progress[tx_id].fp);
1110  txq[node].outgoing.progress[tx_id].fp = nullptr;
1111  txq[node].outgoing.progress[tx_id].file_info.pop_front();
1112  }
1113 
1114  write_meta(txq[node].outgoing.progress[tx_id]);
1115  }
1116  else
1117  {
1118  // Some problem with this transmission, ask other end to dequeue it
1119  outgoing_tx_del(node, tx_id);
1120 
1121  int32_t remote_node = lookup_remote_node_id(node);
1122  if (remote_node >= 0)
1123  {
1124  // Send a CANCEL packet
1125  std::vector<PACKET_BYTE> packet;
1126  make_cancel_packet(packet, remote_node, tx_id);
1127  queuesendto("tx", use_channel, packet);
1128  }
1129  }
1130  }
1131  }
1132  }
1133 
1134  // If things have grown quiet, send a QUEUE packet
1135 
1136  if (current_time > next_queue_time)
1137  {
1138  for (uint16_t node=0; node<txq.size(); ++node)
1139  {
1140  std::vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1141  PACKET_TX_ID_TYPE iq = 0;
1142  for (uint16_t i=1; i<TRANSFER_QUEUE_SIZE; ++i)
1143  {
1144  if (txq[node].outgoing.progress[i].tx_id != 0)
1145  {
1146  tqueue[iq++] = txq[node].outgoing.progress[i].tx_id;
1147  }
1148  if (iq == TRANSFER_QUEUE_LIMIT)
1149  {
1150  break;
1151  }
1152  }
1153  // if (iq)
1154  {
1155  make_queue_packet(packet, node, txq[node].node_name, tqueue);
1156  send_time = queuesendto("tx", use_channel, packet);
1157  next_data_time += send_time;
1158  }
1159 
1160  }
1161 
1162  // Calculate next likely queue time
1163  next_queue_time = current_time + 10./86400.;
1164  }
1165 
1166  outgoing_tx_lock.unlock();
1167  }
1168 }
int32_t outgoing_tx_del(int32_t node, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:1968
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
void make_data_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, PACKET_TX_ID_TYPE tx_id, PACKET_CHUNK_SIZE_TYPE byte_count, PACKET_FILE_SIZE_TYPE chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:232
int32_t write_meta(tx_progress &tx)
Definition: agent_file.cpp:1316
double queuesendto(std::string type, uint16_t channel, std::vector< PACKET_BYTE > packet)
Definition: agent_file.cpp:1196
PACKET_TX_ID_TYPE check_tx_id(std::vector< tx_progress > tx_entry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:2192
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
static double next_queue_time
Definition: agent_file.cpp:135
static Agent * agent
Definition: agent_file.cpp:94
#define TRANSFER_QUEUE_SIZE
Definition: agent_file.cpp:58
static int32_t active_node
Definition: agent_file.cpp:162
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
#define PACKET_SIZE_LO
Definition: agent_file.cpp:61
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
void make_cancel_packet(vector< PACKET_BYTE > &packet, packet_struct_cancel cancel)
Definition: transferlib.cpp:82
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
static std::mutex outgoing_tx_lock
Definition: agent_file.cpp:129
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define SEEK_SET
Definition: zconf.h:475
uint8_t PACKET_BYTE
Definition: transferlib.h:140
int32_t lookup_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file.cpp:2228
void make_queue_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > queue)
Definition: transferlib.cpp:277
static string node
Definition: agent_monitor.cpp:126
#define PACKET_SIZE_HI
Definition: agent_file.cpp:64
void recv_loop ( )
noexcept
453 {
454  std::vector<PACKET_BYTE> recvbuf;
455  std::string partial_filepath;
456 
457  while (agent->running())
458  {
459  if (agent->running() == (uint16_t)Agent::State::IDLE)
460  {
461  COSMOS_SLEEP(1);
462  continue;
463  }
464 
465  COSMOS_USLEEP(1);
466  int32_t nbytes = 0;
467  if (( nbytes = myrecvfrom("rx", recvchan, recvbuf, PACKET_MAX_LENGTH)) > 0)
468  {
469  // Respond appropriately to incoming packet
470  switch (recvbuf[0] & 0x0f)
471  {
472  case PACKET_METADATA:
473  {
475 
476  extract_metadata(recvbuf, meta);
477 
478  incoming_tx_lock.lock();
479 
480  incoming_tx_update(meta);
481 
482  incoming_tx_lock.unlock();
483 
484  break;
485  }
486  case PACKET_DATA:
487  {
488  packet_struct_data data;
489 
490  extract_data(recvbuf, data.node_id, data.tx_id, data.byte_count, data.chunk_start, data.chunk);
491 
493 
494  // create transaction entry if new, and then add data
495 
496  incoming_tx_lock.lock();
497 
498  int32_t node = check_node_id_1(data.node_id);
499 
500  if (node >= 0)
501  {
502  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].incoming.progress, data.tx_id);
503 
504  // Update corresponding incoming queue entry if it exists
505  if (tx_id > 0)
506  {
507  // tx_id now points to the valid entry to which we should add the data
508  file_progress tp;
509  tp.chunk_start = data.chunk_start;
510  tp.chunk_end = data.chunk_start + data.byte_count - 1;
511 
512  uint32_t check=0;
513  bool duplicate = false;
514  bool updated = false;
515 
516  // Do we have any data yet?
517  if (!txq[node].incoming.progress[tx_id].file_info.size())
518  {
519  // Add first entry, then write data
520  txq[node].incoming.progress[tx_id].file_info.push_back(tp);
521  txq[node].incoming.progress[tx_id].total_bytes += data.byte_count;
522  updated = true;
523  }
524  else
525  {
526  // Check against existing data
527  for (uint32_t j=0; j<txq[node].incoming.progress[tx_id].file_info.size(); ++j)
528  {
529  // Check for duplicate
530  if (tp.chunk_start >= txq[node].incoming.progress[tx_id].file_info[j].chunk_start && tp.chunk_end <= txq[node].incoming.progress[tx_id].file_info[j].chunk_end)
531  {
532  duplicate = true;
533  break;
534  }
535  // If we start before this entry
536  if (tp.chunk_start < txq[node].incoming.progress[tx_id].file_info[j].chunk_start)
537  {
538  // If we end before this entry (at least one byte between), insert
539  if (tp.chunk_end + 1 < txq[node].incoming.progress[tx_id].file_info[j].chunk_start)
540  {
541  txq[node].incoming.progress[tx_id].file_info.insert(txq[node].incoming.progress[tx_id].file_info.begin()+j, tp);
542  txq[node].incoming.progress[tx_id].total_bytes += data.byte_count;
543  updated = true;
544  break;
545  }
546  // Otherwise, extend the near end
547  else
548  {
549  tp.chunk_end = txq[node].incoming.progress[tx_id].file_info[j].chunk_start - 1;
550  txq[node].incoming.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
551  data.byte_count = (tp.chunk_end - tp.chunk_start) + 1;
552  txq[node].incoming.progress[tx_id].total_bytes += data.byte_count;
553  updated = true;
554  break;
555  }
556  }
557  else
558  {
559  // If we overlap on the end, extend the far end
560  if (tp.chunk_start <= txq[node].incoming.progress[tx_id].file_info[j].chunk_end + 1)
561  {
562  if (tp.chunk_end > txq[node].incoming.progress[tx_id].file_info[j].chunk_end)
563  {
564  data.byte_count = tp.chunk_end - txq[node].incoming.progress[tx_id].file_info[j].chunk_end;
565  tp.chunk_start = txq[node].incoming.progress[tx_id].file_info[j].chunk_end + 1;
566  txq[node].incoming.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
567  txq[node].incoming.progress[tx_id].total_bytes += data.byte_count;
568  updated = true;
569  break;
570  }
571  }
572  }
573  check = j + 1;
574  }
575 
576 
577  // If we are higher than everything currently in the list, then append
578  if (!duplicate && check == txq[node].incoming.progress[tx_id].file_info.size())
579  {
580  txq[node].incoming.progress[tx_id].file_info.push_back(tp);
581  txq[node].incoming.progress[tx_id].total_bytes += data.byte_count;
582  updated = true;
583  }
584 
585  }
586 
587  // Write to disk if this is new data
588  if (updated)
589  {
590  // Write incoming data to disk
591  if (txq[node].incoming.progress[tx_id].fp == NULL)
592  {
593  partial_filepath = txq[node].incoming.progress[tx_id].temppath + ".file";
594  txq[node].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), "w");
595  }
596 
597  if (txq[node].incoming.progress[tx_id].fp == NULL)
598  {
599  perror(partial_filepath.c_str());
600  }
601  else
602  {
603  fseek(txq[node].incoming.progress[tx_id].fp, tp.chunk_start, SEEK_SET);
604  fwrite(data.chunk, data.byte_count, 1, txq[node].incoming.progress[tx_id].fp);
605  fflush(txq[node].incoming.progress[tx_id].fp);
606  // Write latest meta data to disk
607  write_meta(txq[node].incoming.progress[tx_id]);
608  }
609 
610  }
611 
612  // Check if file has been completely received
613  if(txq[node].incoming.progress[tx_id].file_size == txq[node].incoming.progress[tx_id].total_bytes && txq[node].incoming.progress[tx_id].havemeta)
614  {
615  // See if we know what the remote node_id is for this
616  int32_t remote_node = lookup_remote_node_id(node);
617  if (remote_node >= 0)
618  {
619  tx_progress tx_in = txq[node].incoming.progress[tx_id];
620 
621  // inform other end that file has been received
622  std::vector<PACKET_BYTE> packet;
623  make_complete_packet(packet, remote_node, tx_in.tx_id);
624  queuesendto("rx", use_channel, packet);
625 
626  // Move file to its final location
627  if (!txq[node].incoming.progress[tx_id].complete)
628  {
629  if (txq[node].incoming.progress[tx_id].fp != nullptr)
630  {
631  fclose(txq[node].incoming.progress[tx_id].fp);
632  txq[node].incoming.progress[tx_id].fp = nullptr;
633  }
634  std::string final_filepath = tx_in.temppath + ".file";
635  int iret = rename(final_filepath.c_str(), tx_in.filepath.c_str());
636  if (debug_flag)
637  {
638  printf("Renamed: %d %s\n", iret, tx_in.filepath.c_str());
639  }
640  // Mark complete
641  txq[node].incoming.progress[tx_id].complete = true;
642  }
643  }
644  }
645  }
646  }
647 
648  incoming_tx_lock.unlock();
649 
650  break;
651  }
652  case PACKET_REQDATA:
653  {
654  packet_struct_reqdata reqdata;
655 
656  extract_reqdata(recvbuf, reqdata);
657 
658  // Simple validity check
659  int32_t node = check_node_id_1(reqdata.node_id);
660 
661  if (node < 0 || reqdata.hole_end < reqdata.hole_start)
662  {
663  break;
664  }
665 
666  active_node = node;
667  outgoing_tx_lock.lock();
668 
669  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].outgoing.progress, reqdata.tx_id);
670  // tx_id now points to the valid entry to which we should add the data
671 
672  if (tx_id > 0)
673  {
674  // Add this chunk to the queue
675  file_progress tp;
676  tp.chunk_start = reqdata.hole_start;
677  tp.chunk_end = reqdata.hole_end;
678  PACKET_FILE_SIZE_TYPE byte_count = (reqdata.hole_end - reqdata.hole_start) + 1;
679 
680  uint32_t check=0;
681  // Anything in the queue yet?
682  if (!txq[node].outgoing.progress[tx_id].file_info.size())
683  {
684  // Add first entry
685  txq[node].outgoing.progress[tx_id].file_info.push_back(tp);
686  txq[node].outgoing.progress[tx_id].total_bytes += byte_count;
687  // cout<<"[Send] In (new): "<<"["<<0<<":"<<txq[node].outgoing.progress[tx_id].file_info.size()<<"]"<<" tx_id: "<<txq[node].outgoing.progress[tx_id].tx_id<<" chunk:["<<reqdata.hole_start<<":"<<reqdata.hole_end<<"] now:["<<txq[node].outgoing.progress[tx_id].file_info[0].chunk_start<<":"<<txq[node].outgoing.progress[tx_id].file_info[0].chunk_end<< std::endl;
688  }
689  else
690  {
691  // Check against existing data
692  for (uint32_t j=0; j<txq[node].outgoing.progress[tx_id].file_info.size(); ++j)
693  {
694  // If we match this entry
695  if (tp.chunk_start == txq[node].outgoing.progress[tx_id].file_info[j].chunk_start && tp.chunk_end == txq[node].outgoing.progress[tx_id].file_info[j].chunk_end)
696  {
697  break;
698  }
699  // If we start before this entry
700  if (tp.chunk_start < txq[node].outgoing.progress[tx_id].file_info[j].chunk_start)
701  {
702  // If we end before this entry (at least one byte between), insert
703  if (tp.chunk_end + 1 < txq[node].outgoing.progress[tx_id].file_info[j].chunk_start)
704  {
705  txq[node].outgoing.progress[tx_id].file_info.insert(txq[node].outgoing.progress[tx_id].file_info.begin()+j, tp);
706  txq[node].outgoing.progress[tx_id].total_bytes += byte_count;
707  // cout<<"[Send] In (insert): "<<"["<<j<<":"<<txq[node].outgoing.progress[tx_id].file_info.size()<<"]"<<" tx_id: "<<txq[node].outgoing.progress[tx_id].tx_id<<" chunk:["<<reqdata.hole_start<<":"<<reqdata.hole_end<<"] now:["<<txq[node].outgoing.progress[tx_id].file_info[j].chunk_start<<":"<<txq[node].outgoing.progress[tx_id].file_info[j].chunk_end<< std::endl;
708  break;
709  }
710  // Otherwise, extend the near end
711  else
712  {
713  tp.chunk_end = txq[node].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
714  txq[node].outgoing.progress[tx_id].file_info[j].chunk_start = tp.chunk_start;
715  byte_count = (tp.chunk_end - tp.chunk_start) + 1;
716  txq[node].outgoing.progress[tx_id].total_bytes += byte_count;
717  // cout<<"[Send] In (extend near): "<<"["<<j<<":"<<txq[node].outgoing.progress[tx_id].file_info.size()<<"]"<<" tx_id: "<<txq[node].outgoing.progress[tx_id].tx_id<<" chunk:["<<reqdata.hole_start<<":"<<reqdata.hole_end<<"] now:["<<txq[node].outgoing.progress[tx_id].file_info[j].chunk_start<<":"<<txq[node].outgoing.progress[tx_id].file_info[j].chunk_end<< std::endl;
718  break;
719  }
720  }
721  else
722  {
723  // If we overlap on the end, extend the far end
724  if (tp.chunk_start <= txq[node].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
725  {
726  if (tp.chunk_end > txq[node].outgoing.progress[tx_id].file_info[j].chunk_end)
727  {
728  byte_count = tp.chunk_end - txq[node].outgoing.progress[tx_id].file_info[j].chunk_end;
729  tp.chunk_start = txq[node].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
730  txq[node].outgoing.progress[tx_id].file_info[j].chunk_end = tp.chunk_end;
731  txq[node].outgoing.progress[tx_id].total_bytes += byte_count;
732  // cout<<"[Send] In (extend far): "<<"["<<j<<":"<<txq[node].outgoing.progress[tx_id].file_info.size()<<"]"<<" tx_id: "<<txq[node].outgoing.progress[tx_id].tx_id<<" chunk:["<<reqdata.hole_start<<":"<<reqdata.hole_end<<"] now:["<<txq[node].outgoing.progress[tx_id].file_info[j].chunk_start<<":"<<txq[node].outgoing.progress[tx_id].file_info[j].chunk_end<< std::endl;
733  break;
734  }
735  }
736  }
737  check = j + 1;
738  }
739 
740 
741  // If we are higher than everything currently in the list, then append
742  if (check == txq[node].outgoing.progress[tx_id].file_info.size())
743  {
744  txq[node].outgoing.progress[tx_id].file_info.push_back(tp);
745  txq[node].outgoing.progress[tx_id].total_bytes += byte_count;
746  // cout<<"[Send] In (append): "<<"["<<check<<":"<<txq[node].outgoing.progress[tx_id].file_info.size()<<"]"<<" tx_id: "<<txq[node].outgoing.progress[tx_id].tx_id<<" chunk:["<<reqdata.hole_start<<":"<<reqdata.hole_end<<"] now:["<<txq[node].outgoing.progress[tx_id].file_info[check].chunk_start<<":"<<txq[node].outgoing.progress[tx_id].file_info[check].chunk_end<< std::endl;
747  }
748 
749  }
750 
751  // Save meta to disk
752  write_meta(txq[node].outgoing.progress[tx_id]);
753  txq[node].outgoing.id = reqdata.tx_id;
754  }
755 
756  outgoing_tx_lock.unlock();
757  // current_updatetime = currentmjd() + 5./86400.;
758  break;
759  }
760  //Request missing metadata
761  case PACKET_REQMETA:
762  {
763  packet_struct_reqmeta reqmeta;
764 
765  extract_reqmeta(recvbuf, reqmeta);
766 
767  outgoing_tx_lock.lock();
768 
769 
770  // Send requested META packets
771  int32_t node = set_remote_node_id(reqmeta.node_id, reqmeta.node_name);
772  if (node >= 0)
773  {
774  // See if we know what the remote node_id is for this
775  int32_t remote_node = lookup_remote_node_id(node);
776  if (remote_node >= 0)
777  {
778  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
779  {
780  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].outgoing.progress, reqmeta.tx_id[i]);
781  if (tx_id > 0)
782  {
783  tx_progress tx = txq[node].outgoing.progress[tx_id];
784  std::vector<PACKET_BYTE> packet;
785  make_metadata_packet(packet, remote_node, tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.agent_name.c_str());
786  queuesendto("tx", use_channel, packet);
787  }
788  }
789  }
790  }
791 
792  outgoing_tx_lock.unlock();
793  break;
794  }
795  case PACKET_CANCEL:
796  {
797  packet_struct_complete cancel;
798 
799  extract_complete(recvbuf, cancel);
800 
801  int32_t node = check_node_id_1(cancel.node_id);
802  if (node >= 0)
803  {
804  incoming_tx_lock.lock();
805 
806  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].incoming.progress, cancel.tx_id);
807 
808  if (tx_id > 0)
809  {
810  // Remove the transaction
811  incoming_tx_del(node, tx_id);
812  // active_node = -1;
813  }
814 
815  next_incoming_tx(node);
816  incoming_tx_lock.unlock();
817  }
818  break;
819  }
820  case PACKET_COMPLETE:
821  {
822  packet_struct_complete complete;
823 
824  extract_complete(recvbuf, complete);
825 
826  int32_t node = check_node_id_1(complete.node_id);
827  if (node >= 0)
828  {
829  outgoing_tx_lock.lock();
830 
831  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].outgoing.progress, complete.tx_id);
832 
833  if (tx_id > 0)
834  {
835  // See if we know what the remote node_id is for this
836  int32_t remote_node = lookup_remote_node_id(node);
837  if (remote_node >= 0)
838  {
839  // Remove transaction
840  outgoing_tx_del(node, tx_id);
841 
842  // Send a CANCEL packet
843  std::vector<PACKET_BYTE> packet;
844  make_cancel_packet(packet, remote_node, complete.tx_id);
845  queuesendto("tx", use_channel, packet);
846  }
847  }
848 
849  outgoing_tx_lock.unlock();
850  }
851  break;
852  }
853  case PACKET_QUEUE:
854  {
855  packet_struct_queue queue;
856 
857  extract_queue(recvbuf, queue);
858 
859  incoming_tx_lock.lock();
860 
861  // Is this a node we are handling?
862  int32_t node = check_node_id_1(queue.node_name);
863  if (node >= 0)
864  {
865  // Set remote node_id
866  txq[node].node_id = queue.node_id + 1;
867  // Set active_node
868 // active_node = node;
869  // Sort through incoming queue and remove anything not in sent queue
870  for (uint16_t tx_id=0; tx_id<TRANSFER_QUEUE_SIZE; ++tx_id)
871  {
872  bool valid = false;
873  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
874  {
875  if (txq[node].incoming.progress[tx_id].tx_id == queue.tx_id[i])
876  {
877  // Incoming transaction is in outgoing queue
878  active_node = node;
879  valid = true;
880  break;
881  }
882 // if (valid)
883 // {
884 // break;
885 // }
886  }
887 
888  if (tx_id && !valid)
889  {
890  active_node = node;
891  incoming_tx_del(node, tx_id);
892  }
893  }
894 
895  // Sort through sent queue and add anything not in incoming queue
896  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
897  {
898  if (queue.tx_id[i])
899  {
900  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].incoming.progress, queue.tx_id[i]);
901 
902  if (tx_id == 0)
903  {
904  active_node = node;
905  incoming_tx_add(queue.node_name, queue.tx_id[i]);
906  }
907  }
908  }
909 
910  // Go through final incoming queue and request any missing meta data
912  {
914  std::vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
915  PACKET_TX_ID_TYPE iq = 0;
916  for (uint16_t tx_id=1; tx_id<TRANSFER_QUEUE_SIZE; ++tx_id)
917  {
918  if (txq[node].incoming.progress[tx_id].tx_id && !txq[node].incoming.progress[tx_id].havemeta)
919  {
920  next_reqmeta_time += sizeof(packet_struct_metashort) / (86400. * send_channel[use_channel].throughput);
921  tqueue[iq++] = tx_id;
922  }
923  if (iq == TRANSFER_QUEUE_LIMIT)
924  {
925  break;
926  }
927  }
928  if (iq)
929  {
930  std::vector<PACKET_BYTE> packet;
931  active_node = node;
932  make_reqmeta_packet(packet, node, txq[node].node_name, tqueue);
933  queuesendto("rx", use_channel, packet);
934  }
935  }
936 
937  next_incoming_tx(node);
938  // PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].incoming.progress, choose_incoming_tx_id(node));
939 
940  // if (tx_id < TRANSFER_QUEUE_SIZE && tx_id > 0)
941  // {
942  // // See if we know what the remote node_id is for this
943  // int32_t remote_node = lookup_remote_node_id(node);
944  // if (remote_node >= 0)
945  // {
946  // // Check if file has been completely received
947  // if(txq[node].incoming.progress[tx_id].file_size == txq[node].incoming.progress[tx_id].total_bytes && txq[node].incoming.progress[tx_id].havemeta)
948  // {
949  // tx_progress tx_in = txq[node].incoming.progress[tx_id];
950 
951  // // inform other end that file has been received
952  // std::vector<PACKET_BYTE> packet;
953  // make_complete_packet(packet, remote_node, tx_in.tx_id);
954  // queuesendto("rx", use_channel, packet);
955 
956  // // Move file to its final location
957  // if (!txq[node].incoming.progress[tx_id].complete)
958  // {
959  // if (txq[node].incoming.progress[tx_id].fp != nullptr)
960  // {
961  // fclose(txq[node].incoming.progress[tx_id].fp);
962  // txq[node].incoming.progress[tx_id].fp = nullptr;
963  // }
964  // std::string final_filepath = tx_in.temppath + ".file";
965  // rename(final_filepath.c_str(), tx_in.filepath.c_str());
966  // txq[node].incoming.progress[tx_id].complete = true;
967  // }
968  // }
969  // else
970  // {
971  // // Ask for missing data
972  // std::vector<file_progress> missing;
973  // missing = find_chunks_missing(txq[node].incoming.progress[tx_id]);
974  // for (uint32_t j=0; j<missing.size(); ++j)
975  // {
976  // std::vector<PACKET_BYTE> packet;
977  // make_reqdata_packet(packet, remote_node, txq[node].incoming.progress[tx_id].tx_id, missing[j].chunk_start, missing[j].chunk_end);
978  // queuesendto("rx", use_channel, packet);
979  // }
980  // }
981  // }
982  // }
983  }
984  incoming_tx_lock.unlock();
985  }
986  }
987  }
988  }
989 }
static socket_channel recvchan
Definition: agent_file.cpp:120
int32_t outgoing_tx_del(int32_t node, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:1968
static std::mutex incoming_tx_lock
Definition: agent_file.cpp:128
Definition: transferlib.h:205
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int32_t set_remote_node_id(PACKET_NODE_ID_TYPE node_id, std::string node_name)
Definition: agent_file.cpp:2241
Definition: transferlib.h:291
int32_t write_meta(tx_progress &tx)
Definition: agent_file.cpp:1316
double queuesendto(std::string type, uint16_t channel, std::vector< PACKET_BYTE > packet)
Definition: agent_file.cpp:1196
PACKET_TX_ID_TYPE check_tx_id(std::vector< tx_progress > tx_entry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:2192
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
Definition: transferlib.h:332
Definition: transferlib.h:338
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:263
static double next_reqmeta_time
Definition: agent_file.cpp:134
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
int32_t next_incoming_tx(PACKET_NODE_ID_TYPE node)
Definition: agent_file.cpp:2255
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
static Agent * agent
Definition: agent_file.cpp:94
Definition: transferlib.h:245
#define TRANSFER_QUEUE_SIZE
Definition: agent_file.cpp:58
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:219
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
static int32_t active_node
Definition: agent_file.cpp:162
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
Definition: transferlib.h:217
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
void make_cancel_packet(vector< PACKET_BYTE > &packet, packet_struct_cancel cancel)
Definition: transferlib.cpp:82
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
static std::mutex outgoing_tx_lock
Definition: agent_file.cpp:129
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:207
string filepath
Definition: transferlib.h:352
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
Definition: transferlib.h:261
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define SEEK_SET
Definition: zconf.h:475
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file.cpp:2059
static sendchannelstruc send_channel[2]
Definition: agent_file.cpp:108
string file_name
Definition: transferlib.h:351
Definition: transferlib.h:275
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:221
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
int32_t lookup_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file.cpp:2228
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:220
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
int32_t myrecvfrom(std::string type, socket_channel channel, std::vector< PACKET_BYTE > &buf, uint32_t length)
Definition: agent_file.cpp:1241
int32_t incoming_tx_add(tx_progress tx_in)
Definition: agent_file.cpp:2008
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
static bool debug_flag
Definition: agent_file.cpp:69
static string node
Definition: agent_monitor.cpp:126
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
void make_reqmeta_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > reqmeta)
Definition: transferlib.cpp:110
int32_t incoming_tx_del(int32_t node, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:2102
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:293
static double last_data_receive_time
Definition: agent_file.cpp:131
void transmit_loop ( )
noexcept
1171 {
1172  std::mutex transmit_queue_lock;
1173  std::unique_lock<std::mutex> locker(transmit_queue_lock);
1174 
1175  while (agent->running())
1176  {
1177  if (agent->running() == (uint16_t)Agent::State::IDLE)
1178  {
1179  COSMOS_SLEEP(1);
1180  continue;
1181  }
1182 
1183 
1184  transmit_queue_check.wait(locker);
1185 
1186  while (!transmit_queue.empty())
1187  {
1188  // Get next packet from transceiver FIFO
1189  transmit_queue_entry entry = transmit_queue.front();
1190  transmit_queue.pop();
1191  mysendto(entry.type, send_channel[entry.channel], entry.packet);
1192  }
1193  }
1194 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
static Agent * agent
Definition: agent_file.cpp:94
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
static std::condition_variable transmit_queue_check
Definition: agent_file.cpp:118
static sendchannelstruc send_channel[2]
Definition: agent_file.cpp:108
int32_t mysendto(std::string type, sendchannelstruc &channel, std::vector< PACKET_BYTE > &buf)
Definition: agent_file.cpp:1216
Definition: agent_file.cpp:110
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file.cpp:117
int32_t request_debug ( string &  request,
string &  response,
Agent agent 
)
2306 {
2307 
2308  std::string requestString = std::string(request);
2309  StringParser sp(requestString, ' ');
2310 
2311  debug_flag = sp.getFieldNumberAsDouble(2); // should be getFieldNumberAsBoolean
2312 
2313  std::cout << "debug_flag: " << debug_flag << std::endl;
2314  return 0;
2315 }
Definition: stringlib.h:89
static bool debug_flag
Definition: agent_file.cpp:69
int32_t request_use_channel ( string &  request,
string &  response,
Agent agent 
)
1779 {
1780  uint16_t channel=0;
1781  uint32_t throughput=0;
1782 
1783  sscanf(request.c_str(), "%*s %hu %u\n", &channel, &throughput);
1784  if (channel < send_channels)
1785  {
1786  use_channel = channel;
1787  if (throughput)
1788  {
1789  send_channel[channel].throughput = throughput;
1790  }
1791  } else {
1792  response = "Channel " + std::to_string(channel) + " too large";
1793  }
1794  return 0;
1795 
1796 }
string to_string(char *value)
Definition: stringlib.cpp:220
static uint16_t use_channel
Definition: agent_file.cpp:97
uint32_t throughput
Definition: agent_file.cpp:104
static uint16_t send_channels
Definition: agent_file.cpp:96
static sendchannelstruc send_channel[2]
Definition: agent_file.cpp:108
int32_t request_remove_file ( string &  request,
string &  response,
Agent agent 
)
1799 {
1800  char type;
1801  uint32_t tx_id;
1802 
1803  sscanf(request.c_str(), "%*s %c %u\n", &type, &tx_id);
1804  switch (type)
1805  {
1806  case 'i':
1807  {
1808  break;
1809  }
1810  case 'o':
1811  {
1812  break;
1813  }
1814  }
1815 
1816  return 0;
1817 }
int32_t request_ls ( string &  request,
string &  response,
Agent agent 
)
1539 {
1540 
1541  //the request string == "ls directoryname"
1542  //get the directory name
1543 // char directoryname[COSMOS_MAX_NAME+1];
1544 // memmove(directoryname, request.substr(3), COSMOS_MAX_NAME);
1545  std::string directoryname = request.substr(3);
1546 
1547  DIR* dir;
1548  struct dirent* ent;
1549 
1550  std::string all_file_names;
1551 
1552  if((dir = opendir(directoryname.c_str())) != NULL)
1553  {
1554  while (( ent = readdir(dir)) != NULL)
1555  {
1556  all_file_names += ent->d_name;
1557  all_file_names += "\n";
1558  }
1559  closedir(dir);
1560 
1561  response = (all_file_names.c_str());
1562  }
1563  else
1564  {
1565  response = "unable to open directory " + directoryname;
1566  }
1567  return 0;
1568 }
int closedir(DIR *dir)
Definition: dirent.c:74
Definition: dirent.c:24
Definition: dirent.h:21
DIR * opendir(const char *name)
Definition: dirent.c:32
struct dirent * readdir(DIR *dir)
Definition: dirent.c:97
char * d_name
Definition: dirent.h:23
int32_t request_list_incoming ( string &  request,
string &  response,
Agent agent 
)
1571 {
1572  response.clear();
1573  for (uint16_t node = 0; node<txq.size(); ++node)
1574  {
1575  response += std::to_string(node) + ' ' + txq[(node)].node_name + ' ' + std::to_string(txq[(node)].incoming.size) + "\n";
1576  for(tx_progress tx : txq[(node)].incoming.progress)
1577  {
1578  if (tx.tx_id)
1579  {
1580  response += to_label("tx_id", tx.tx_id) + ' ';
1581  response += to_label("node", tx.node_name) + ' ';
1582  response += to_label("agent", tx.agent_name) + ' ';
1583  response += to_label("name", tx.file_name) + ' ';
1584  response += to_label("bytes", tx.total_bytes) + ' ';
1585  response += "/" + to_unsigned(tx.file_size) + ' ';
1586  response += to_label("havemeta", tx.havemeta) + ' ';
1587  response += to_label("sendmeta", tx.sendmeta) + ' ';
1588  response += to_label("sentmeta", tx.sentmeta) + ' ';
1589  response += to_label("senddata", tx.senddata) + ' ';
1590  response += to_label("sentdata", tx.sentdata) + ' ';
1591  response += to_label("complete", tx.complete);
1592  response += "\n";
1593  }
1594  }
1595  }
1596 
1597  return 0;
1598 }
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
string node_name
Definition: agent_001.cpp:46
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
static string node
Definition: agent_monitor.cpp:126
string to_label(string label, double value, uint16_t precision, bool mjd)
Definition: stringlib.cpp:376
int32_t request_list_incoming_json ( string &  request,
string &  response,
Agent agent 
)
1601 {
1602  (response = json_list_incoming().c_str());
1603  return 0;
1604 }
string json_list_incoming()
Definition: agent_file.cpp:1612
int32_t request_list_outgoing ( string &  request,
string &  response,
Agent agent 
)
1716 {
1717  response.clear();
1718  for (uint16_t node=0; node<txq.size(); ++node)
1719  {
1720  response += std::to_string(node) + ' ' + txq[(node)].node_name + ' ' + std::to_string(txq[(node)].outgoing.size) + "\n";
1721  for(tx_progress tx : txq[(node)].outgoing.progress)
1722  {
1723  if (tx.tx_id)
1724  {
1725  response += to_label("tx_id", tx.tx_id) + ' ';
1726  response += to_label("node", tx.node_name) + ' ';
1727  response += to_label("agent", tx.agent_name) + ' ';
1728  response += to_label("name", tx.file_name) + ' ';
1729  response += to_label("bytes", tx.total_bytes) + ' ';
1730  response += "/" + to_unsigned(tx.file_size) + ' ';
1731  response += to_label("havemeta", tx.havemeta) + ' ';
1732  response += to_label("sendmeta", tx.sendmeta) + ' ';
1733  response += to_label("sentmeta", tx.sentmeta) + ' ';
1734  response += to_label("senddata", tx.senddata) + ' ';
1735  response += to_label("sentdata", tx.sentdata) + ' ';
1736  response += to_label("complete", tx.complete);
1737  response += "\n";
1738  }
1739  }
1740  }
1741 
1742  return 0;
1743 }
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
string to_unsigned(uint64_t value, uint16_t digits, bool zerofill)
Definition: stringlib.cpp:265
string node_name
Definition: agent_001.cpp:46
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
static string node
Definition: agent_monitor.cpp:126
string to_label(string label, double value, uint16_t precision, bool mjd)
Definition: stringlib.cpp:376
int32_t request_list_outgoing_json ( string &  request,
string &  response,
Agent agent 
)
1607 {
1608  (response = json_list_outgoing().c_str());
1609  return 0;
1610 }
string json_list_outgoing()
Definition: agent_file.cpp:1647
int32_t outgoing_tx_add ( tx_progress  tx_out)
1849 {
1850  int32_t node = check_node_id_1(tx_out.node_name);
1851  if (node <0)
1852  {
1853  return TRANSFER_ERROR_NODE;
1854  }
1855 
1856  // Only add if we have room
1857  if (txq[node].outgoing.size == TRANSFER_QUEUE_LIMIT)
1858  {
1859  return TRANSFER_ERROR_QUEUEFULL;
1860  }
1861 
1862  tx_out.fp = nullptr;
1863  tx_out.total_bytes = 0;
1864  tx_out.filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
1865  //get the file size
1866  tx_out.file_size = get_file_size(tx_out.filepath);
1867  tx_out.temppath = data_base_path(tx_out.node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
1868  tx_out.savetime = 0.;
1869 
1870  // save and queue metadata packet
1871  // tx_out.sendcomplete = false;
1872  // tx_out.reqmeta = false;
1873  tx_out.havemeta = true;
1874 
1875  // Good to go. Add it to queue.
1876  outgoing_tx_lock.lock();
1877  txq[node].outgoing.progress[tx_out.tx_id] = tx_out;
1878  ++txq[node].outgoing.size;
1879  outgoing_tx_lock.unlock();
1880 
1881  if (debug_flag)
1882  {
1883  printf("Add outgoing: %u %s %s %s\n", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
1884  }
1885 
1886  return 0;
1887 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
static std::mutex outgoing_tx_lock
Definition: agent_file.cpp:129
#define TRANSFER_ERROR_QUEUEFULL
Definition: cosmos-errno.h:221
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
string file_name
Definition: transferlib.h:351
static bool debug_flag
Definition: agent_file.cpp:69
static string node
Definition: agent_monitor.cpp:126
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
int32_t outgoing_tx_add ( std::string  node_name,
std::string  agent_name,
std::string  file_name 
)
1890 {
1891  // BEGIN GATHERING THE METADATA
1892  tx_progress tx_out;
1893 
1894  int32_t node = check_node_id_1(node_name);
1895  if (node <0)
1896  {
1897  return TRANSFER_ERROR_NODE;
1898  }
1899 
1900  // Only add if we have room
1901  if (txq[node].outgoing.size == TRANSFER_QUEUE_LIMIT)
1902  {
1903  return TRANSFER_ERROR_QUEUEFULL;
1904  }
1905 
1906  // Locate next empty space
1907  //get the file size
1908  outgoing_tx_lock.lock();
1909  tx_out.tx_id = 0;
1910  PACKET_TX_ID_TYPE id = txq[node].outgoing.next_id;
1911  do
1912  {
1913  // 0 is special case
1914  if (id == 0)
1915  {
1916  ++id;
1917  }
1918 
1919  if (txq[node].outgoing.progress[id].tx_id == 0)
1920  {
1921  tx_out.tx_id = id;
1922  txq[node].outgoing.next_id = id + 1;
1923  break;
1924  }
1925  // If no empty found, increment, allowing to wrap if necessary
1926  } while (++id != txq[node].outgoing.next_id);
1927  outgoing_tx_lock.unlock();
1928 
1929  if (tx_out.tx_id > 0)
1930  {
1931  tx_out.node_name = node_name;
1932  tx_out.agent_name = agent_name;
1933  tx_out.file_name = file_name;
1934  tx_out.temppath = data_base_path(node_name, "temp", "file", "out_"+std::to_string(tx_out.tx_id));
1935 
1936  std::ifstream filename;
1937 
1938  // set the file path
1939  std::string filepath = data_base_path(tx_out.node_name, "outgoing", tx_out.agent_name, tx_out.file_name);
1940 
1941  //get the file size
1942  tx_out.file_size = get_file_size(filepath);
1943 
1944  if(tx_out.file_size < 0)
1945  {
1946  return DATA_ERROR_SIZE_MISMATCH;
1947  }
1948 
1949  // see if file can be opened
1950  filename.open(filepath, std::ios::in|std::ios::binary);
1951  if(!filename.is_open())
1952  {
1953  return -errno;
1954  }
1955  filename.close();
1956 
1957  write_meta(tx_out);
1958 
1959  int32_t iretn = outgoing_tx_add(tx_out);
1960  return iretn;
1961  }
1962  else
1963  {
1964  return TRANSFER_ERROR_MATCH;
1965  }
1966 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
int32_t write_meta(tx_progress &tx)
Definition: agent_file.cpp:1316
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
Definition: transferlib.h:338
string to_string(char *value)
Definition: stringlib.cpp:220
int iretn
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
string node_name
Definition: agent_001.cpp:46
int32_t outgoing_tx_add(tx_progress tx_out)
Definition: agent_file.cpp:1848
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
static std::mutex outgoing_tx_lock
Definition: agent_file.cpp:129
#define TRANSFER_ERROR_QUEUEFULL
Definition: cosmos-errno.h:221
string node_name
Definition: transferlib.h:349
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
string agent_name
Definition: agent_001.cpp:47
string file_name
Definition: transferlib.h:351
static string node
Definition: agent_monitor.cpp:126
int32_t get_file_size(string filename)
Get size of file.
Definition: transferlib.cpp:402
int32_t outgoing_tx_del ( int32_t  node,
PACKET_TX_ID_TYPE  tx_id 
)
1969 {
1970  if (node <0 || (uint32_t)node > txq.size())
1971  {
1972  return TRANSFER_ERROR_INDEX;
1973  }
1974 
1975  if (txq[node].outgoing.progress[tx_id].tx_id == 0)
1976  {
1977  return TRANSFER_ERROR_MATCH;
1978  }
1979 
1980  tx_progress tx_out = txq[node].outgoing.progress[tx_id];
1981 
1982  // erase the transaction
1983  // outgoing_tx.erase(outgoing_tx.begin()+tx_id);
1984  txq[node].outgoing.progress[tx_id].tx_id = 0;
1985  --txq[node].outgoing.size;
1986 
1987  // Set current tx id back to 0
1988  txq[node].outgoing.id = 0;
1989 
1990  // Remove the file
1991  if(remove(tx_out.filepath.c_str()))
1992  {
1993  unable_to_remove(tx_out.filepath);
1994  }
1995 
1996  // Remove the META file
1997  std::string meta_filepath = tx_out.temppath + ".meta";
1998  remove(meta_filepath.c_str());
1999 
2000  if (debug_flag)
2001  {
2002  printf("Del outgoing: %u %s %s %s\n", tx_out.tx_id, tx_out.node_name.c_str(), tx_out.agent_name.c_str(), tx_out.file_name.c_str());
2003  }
2004 
2005  return 0;
2006 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:338
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
void unable_to_remove(string filename)
Definition: transferlib.cpp:480
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
#define TRANSFER_ERROR_INDEX
Definition: cosmos-errno.h:222
string agent_name
Definition: transferlib.h:350
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
string file_name
Definition: transferlib.h:351
static bool debug_flag
Definition: agent_file.cpp:69
static string node
Definition: agent_monitor.cpp:126
int32_t incoming_tx_add ( tx_progress  tx_in)
2009 {
2010  int32_t node = check_node_id_1(tx_in.node_name);
2011  if (node <0)
2012  {
2013  return TRANSFER_ERROR_NODE;
2014  }
2015 
2016  if (tx_in.file_name.size())
2017  {
2018  tx_in.filepath = data_base_path(tx_in.node_name, "incoming", tx_in.agent_name, tx_in.file_name);
2019  }
2020  else
2021  {
2022  tx_in.filepath = "";
2023  }
2024  std::string tx_name = "in_"+std::to_string(tx_in.tx_id);
2025  tx_in.temppath = data_base_path(tx_in.node_name, "temp", "file", tx_name);
2026  tx_in.savetime = 0.;
2027  tx_in.fp = nullptr;
2028 
2029  // Put it in list
2030  txq[node].incoming.progress[tx_in.tx_id] = tx_in;
2031  ++txq[node].incoming.size;
2032 
2033  if (debug_flag)
2034  {
2035  printf("Add incoming: %u %s\n", tx_in.tx_id, tx_in.node_name.c_str());
2036  }
2037 
2038  return 0;
2039 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
FILE * fp
Definition: transferlib.h:359
string to_string(char *value)
Definition: stringlib.cpp:220
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
double savetime
Definition: transferlib.h:354
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
string agent_name
Definition: transferlib.h:350
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
string file_name
Definition: transferlib.h:351
static bool debug_flag
Definition: agent_file.cpp:69
static string node
Definition: agent_monitor.cpp:126
int32_t incoming_tx_add ( std::string  node_name,
PACKET_TX_ID_TYPE  tx_id 
)
2042 {
2043  tx_progress tx_in;
2044 
2045  tx_in.tx_id = tx_id;
2046  tx_in.node_name = node_name;
2047  tx_in.file_name = "";
2048  tx_in.agent_name = "";
2049  tx_in.havemeta = false;
2050  tx_in.file_size = 0;
2051  tx_in.total_bytes = 0;
2052  tx_in.complete = false;
2053 
2054  int32_t iretn = incoming_tx_add(tx_in);
2055 
2056  return iretn;
2057 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:338
bool havemeta
Definition: transferlib.h:342
int iretn
Definition: rw_test.cpp:37
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
string node_name
Definition: agent_001.cpp:46
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
string file_name
Definition: transferlib.h:351
int32_t incoming_tx_add(tx_progress tx_in)
Definition: agent_file.cpp:2008
bool complete
Definition: transferlib.h:348
int32_t incoming_tx_update ( packet_struct_metashort  meta)
2060 {
2061  int32_t node = check_node_id_1(meta.node_id);
2062  if (node <0)
2063  {
2064  return TRANSFER_ERROR_NODE;
2065  }
2066 
2067  // See if it's already in the queue
2068  if (txq[node].incoming.progress[meta.tx_id].tx_id != meta.tx_id)
2069  {
2070  return TRANSFER_ERROR_MATCH;
2071  }
2072 
2073  if (!txq[node].incoming.progress[meta.tx_id].havemeta)
2074  {
2075  // Core META information
2076  txq[node].incoming.progress[meta.tx_id].node_name = txq[node].node_name;
2077  txq[node].incoming.progress[meta.tx_id].agent_name = meta.agent_name;
2078  txq[node].incoming.progress[meta.tx_id].file_name = meta.file_name;
2079  txq[node].incoming.progress[meta.tx_id].file_size = meta.file_size;
2080  txq[node].incoming.progress[meta.tx_id].filepath = data_base_path(txq[node].incoming.progress[meta.tx_id].node_name, "incoming", txq[node].incoming.progress[meta.tx_id].agent_name, txq[node].incoming.progress[meta.tx_id].file_name);
2081  std::string tx_name = "in_"+std::to_string(txq[node].incoming.progress[meta.tx_id].tx_id);
2082  txq[node].incoming.progress[meta.tx_id].temppath = data_base_path(txq[node].incoming.progress[meta.tx_id].node_name, "temp", "file", tx_name);
2083 
2084  // Derivative META information
2085  txq[node].incoming.progress[meta.tx_id].savetime = 0.;
2086  txq[node].incoming.progress[meta.tx_id].havemeta = true;
2087  txq[node].incoming.progress[meta.tx_id].total_bytes = 0;
2088  txq[node].incoming.progress[meta.tx_id].fp = nullptr;
2089 
2090  // Save it to disk
2091  write_meta(txq[node].incoming.progress[meta.tx_id]);
2092  }
2093 
2094  if (debug_flag)
2095  {
2096  printf("Update incoming: %u %s %s %s\n", txq[node].incoming.progress[meta.tx_id].tx_id, txq[node].incoming.progress[meta.tx_id].node_name.c_str(), txq[node].incoming.progress[meta.tx_id].agent_name.c_str(), txq[node].incoming.progress[meta.tx_id].file_name.c_str());
2097  }
2098 
2099  return meta.tx_id;
2100 }
int32_t write_meta(tx_progress &tx)
Definition: agent_file.cpp:1316
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:248
string to_string(char *value)
Definition: stringlib.cpp:220
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:247
string data_base_path(string node, string location, string agent, string filename)
Create data file path.
Definition: datalib.cpp:767
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
char file_name[128]
Definition: transferlib.h:250
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:249
static bool debug_flag
Definition: agent_file.cpp:69
static string node
Definition: agent_monitor.cpp:126
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:251
int32_t incoming_tx_del ( int32_t  node,
PACKET_TX_ID_TYPE  tx_id 
)
2103 {
2105  if (node <0)
2106  {
2107  return TRANSFER_ERROR_NODE;
2108  }
2109 
2110  if (txq[node].incoming.progress[tx_id].tx_id == 0)
2111  {
2112  return TRANSFER_ERROR_MATCH;
2113  }
2114 
2115  tx_progress tx_in = txq[node].incoming.progress[tx_id];
2116 
2117  txq[node].incoming.progress[tx_id].tx_id = 0;
2118  txq[node].incoming.progress[tx_id].havemeta = false;
2119  --txq[node].incoming.size;
2120 
2121  // Close the DATA file
2122  if (tx_in.fp != nullptr)
2123  {
2124  fclose(tx_in.fp);
2125  tx_in.fp = nullptr;
2126  }
2127 
2128  std::string filepath;
2129  //Remove the DATA file
2130  filepath = tx_in.temppath + ".file";
2131  remove(filepath.c_str());
2132 
2133  // Remove the META file
2134  filepath = tx_in.temppath + ".meta";
2135  remove(filepath.c_str());
2136 
2137  // Make sure we are not using this for incoming_tx_id
2138  if (tx_in.tx_id == txq[node].incoming.id)
2139  {
2140  txq[node].incoming.id = 0;
2141  }
2142 
2143  if (debug_flag)
2144  {
2145  printf("Del incoming: %u %s\n", tx_in.tx_id, tx_in.node_name.c_str());
2146  }
2147 
2148  return 0;
2149 
2150 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
#define TRANSFER_ERROR_NODE
Definition: cosmos-errno.h:223
Definition: transferlib.h:338
FILE * fp
Definition: transferlib.h:359
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
string node_name
Definition: transferlib.h:349
#define TRANSFER_ERROR_MATCH
Definition: cosmos-errno.h:220
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
static bool debug_flag
Definition: agent_file.cpp:69
static string node
Definition: agent_monitor.cpp:126
vector< file_progress > find_chunks_missing ( tx_progress tx)
1485 {
1486  std::vector<file_progress> missing;
1487  file_progress tp;
1488 
1489  if (tx.file_info.size() == 0)
1490  {
1491  tp.chunk_start = 0;
1492  tp.chunk_end = tx.file_size - 1;
1493  missing.push_back(tp);
1494  }
1495  else
1496  {
1498  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
1499 
1500  // Check missing before first chunk
1501  if (tx.file_info[0].chunk_start)
1502  {
1503  tp.chunk_start = 0;
1504  tp.chunk_end = tx.file_info[0].chunk_start - 1;
1505  missing.push_back(tp);
1506  }
1507 
1508  // Check missing between chunks
1509  for (uint32_t i=1; i<tx.file_info.size(); ++i)
1510  {
1511  if (tx.file_info[i-1].chunk_end+1 != tx.file_info[i].chunk_start)
1512  {
1513  tp.chunk_start = tx.file_info[i-1].chunk_end + 1;
1514  tp.chunk_end = tx.file_info[i].chunk_start - 1;
1515  missing.push_back(tp);
1516  }
1517  }
1518 
1519  // Check missing after last chunk
1520  if (tx.file_info[tx.file_info.size()-1].chunk_end + 1 != tx.file_size)
1521  {
1522  tp.chunk_start = tx.file_info[tx.file_info.size()-1].chunk_end + 1;
1523  tp.chunk_end = tx.file_size - 1;
1524  missing.push_back(tp);
1525  }
1526  }
1527 
1528  // calculate bytes so far
1529  tx.total_bytes = 0;
1530  for (file_progress prog : tx.file_info)
1531  {
1532  tx.total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
1533  }
1534 
1535  return (missing);
1536 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
deque< file_progress > file_info
Definition: transferlib.h:358
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file.cpp:1445
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file.cpp:1440
PACKET_FILE_SIZE_TYPE merge_chunks_overlap ( tx_progress tx)
1446 {
1447  switch (tx.file_info.size())
1448  {
1449  case 0:
1450  {
1451  tx.total_bytes = 0;
1452  break;
1453  }
1454  case 1:
1455  {
1456  tx.total_bytes = (tx.file_info[0].chunk_end - tx.file_info[0].chunk_start) + 1;
1457  break;
1458  }
1459  default:
1460  {
1461  tx.total_bytes = 0;
1462  sort(tx.file_info.begin(), tx.file_info.end(), lower_chunk);
1463  for (uint32_t i=0; i<tx.file_info.size(); ++i)
1464  {
1465  for (uint32_t j=i+1; j<tx.file_info.size(); ++j)
1466  {
1467  while (j < tx.file_info.size() && tx.file_info[j].chunk_start <= tx.file_info[i].chunk_end+1)
1468  {
1469  if (tx.file_info[j].chunk_end > tx.file_info[i].chunk_end)
1470  {
1471  tx.file_info[i].chunk_end = tx.file_info[j].chunk_end;
1472  }
1473  tx.file_info.erase(tx.file_info.begin()+j);
1474  }
1475  }
1476  tx.total_bytes += (tx.file_info[i].chunk_end - tx.file_info[i].chunk_start) + 1;
1477  }
1478  break;
1479  }
1480  }
1481  return tx.total_bytes;
1482 }
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
int i
Definition: rw_test.cpp:37
deque< file_progress > file_info
Definition: transferlib.h:358
bool lower_chunk(file_progress i, file_progress j)
Definition: agent_file.cpp:1440
double queuesendto ( std::string  type,
uint16_t  channel,
std::vector< PACKET_BYTE packet 
)
1197 {
1198  transmit_queue_entry tentry;
1199 
1200  tentry.type = type;
1201  tentry.channel = channel;
1202  tentry.packet = packet;
1203  transmit_queue.push(tentry);
1204  transmit_queue_check.notify_one();
1205  double time_step = packet.size() / (86400. * send_channel[channel].throughput);
1206  if (time_step > 0)
1207  {
1208  return time_step;
1209  }
1210  else
1211  {
1212  return 0.;
1213  }
1214 }
std::vector< PACKET_BYTE > packet
Definition: agent_file.cpp:114
uint32_t channel
Definition: agent_file.cpp:113
std::string type
Definition: agent_file.cpp:112
static std::condition_variable transmit_queue_check
Definition: agent_file.cpp:118
uint32_t throughput
Definition: agent_file.cpp:104
static sendchannelstruc send_channel[2]
Definition: agent_file.cpp:108
Definition: agent_file.cpp:110
static std::queue< transmit_queue_entry > transmit_queue
Definition: agent_file.cpp:117
int32_t mysendto ( std::string  type,
sendchannelstruc channel,
std::vector< PACKET_BYTE > &  buf 
)
1217 {
1218  int32_t iretn;
1219  double cmjd;
1220 
1221  if ((cmjd = currentmjd(0.)) < channel.nmjd)
1222  {
1223  COSMOS_USLEEP((uint32_t)(86400000000. * (channel.nmjd - cmjd)));
1224  }
1225 
1226  iretn = sendto(channel.sendchan.cudp, (const char*)&buf[0], buf.size(), 0, (struct sockaddr*) &channel.sendchan.caddr, sizeof(struct sockaddr_in));
1227 
1228  if (iretn >= 0)
1229  {
1230  channel.nmjd = currentmjd() + ((28+iretn) / (float)channel.throughput)/86400.;
1231  debug_packet(buf, type+" out");
1232  }
1233  else
1234  {
1235  iretn = -errno;
1236  }
1237 
1238  return iretn;
1239 }
static double cmjd
Definition: agent_monitor.cpp:121
double nmjd
Definition: agent_file.cpp:105
int iretn
Definition: rw_test.cpp:37
int32_t cudp
Definition: socketlib.h:120
uint32_t throughput
Definition: agent_file.cpp:104
struct sockaddr_in caddr
Definition: socketlib.h:122
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
socket_channel sendchan
Definition: agent_file.cpp:101
char buf[128]
Definition: rw_test.cpp:40
void debug_packet(std::vector< PACKET_BYTE > buf, std::string type)
Definition: agent_file.cpp:1258
int32_t myrecvfrom ( std::string  type,
socket_channel  channel,
std::vector< PACKET_BYTE > &  buf,
uint32_t  length 
)
1242 {
1243  int32_t nbytes;
1244 
1245  buf.resize(length);
1246  if (( nbytes = recvfrom(channel.cudp, (char *)&buf[0], length, 0, static_cast<struct sockaddr *>(nullptr), static_cast<socklen_t *>(nullptr))) > 0)
1247  {
1248  buf.resize(nbytes);
1249  debug_packet(buf, type+" in");
1250  }
1251  else
1252  {
1253  nbytes = -errno;
1254  }
1255  return nbytes;
1256 }
int32_t cudp
Definition: socketlib.h:120
png_uint_32 length
Definition: png.c:2173
char buf[128]
Definition: rw_test.cpp:40
void debug_packet(std::vector< PACKET_BYTE > buf, std::string type)
Definition: agent_file.cpp:1258
void debug_packet ( std::vector< PACKET_BYTE buf,
std::string  type 
)
1259 {
1260  if (debug_flag)
1261  {
1262  printf("[%.15g %s (%" PRIu32 ")] ", currentmjd(), type.c_str(), buf.size());
1263  switch (buf[0] & 0x0f)
1264  {
1265  case PACKET_METADATA:
1266  {
1267  std::string file_name(&buf[PACKET_METASHORT_OFFSET_FILE_NAME], &buf[PACKET_METASHORT_OFFSET_FILE_NAME+TRANSFER_MAX_FILENAME]);
1268  printf("[METADATA] %u %u %s ", buf[PACKET_METASHORT_OFFSET_NODE_ID], buf[PACKET_METASHORT_OFFSET_TX_ID], file_name.c_str());
1269  break;
1270  }
1271  case PACKET_DATA:
1272  {
1274  break;
1275  }
1276  case PACKET_REQDATA:
1277  {
1279  break;
1280  }
1281  case PACKET_REQMETA:
1282  {
1283  printf("[REQMETA] %u %s ", buf[PACKET_REQMETA_OFFSET_NODE_ID], &buf[PACKET_REQMETA_OFFSET_NODE_NAME]);
1284  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1286  {
1287  printf("%u ", buf[PACKET_REQMETA_OFFSET_TX_ID+i]);
1288  }
1289  break;
1290  }
1291  case PACKET_COMPLETE:
1292  {
1293  printf("[COMPLETE] %u %u ", buf[PACKET_COMPLETE_OFFSET_NODE_ID], buf[PACKET_COMPLETE_OFFSET_TX_ID]);
1294  break;
1295  }
1296  case PACKET_CANCEL:
1297  {
1298  printf("[CANCEL] %u %u ", buf[PACKET_CANCEL_OFFSET_NODE_ID], buf[PACKET_CANCEL_OFFSET_TX_ID]);
1299  break;
1300  }
1301  case PACKET_QUEUE:
1302  {
1303  printf("[QUEUE] %u %s ", buf[PACKET_QUEUE_OFFSET_NODE_ID], &buf[PACKET_QUEUE_OFFSET_NODE_NAME]);
1304  for (uint16_t i=0; i<TRANSFER_QUEUE_LIMIT; ++i)
1306  {
1307  printf("%u ", buf[PACKET_QUEUE_OFFSET_TX_ID+i]);
1308  }
1309  }
1310  }
1311  printf("\n");
1312  fflush(stdout);
1313  }
1314 }
#define PACKET_CANCEL_OFFSET_NODE_ID
Definition: transferlib.h:307
#define PACKET_METASHORT_OFFSET_TX_ID
Definition: transferlib.h:255
#define TRANSFER_MAX_FILENAME
Definition: transferlib.h:78
#define PACKET_QUEUE_OFFSET_TX_ID
Definition: transferlib.h:214
#define PACKET_REQDATA_OFFSET_NODE_ID
Definition: transferlib.h:269
int i
Definition: rw_test.cpp:37
#define PACKET_REQDATA_OFFSET_HOLE_END
Definition: transferlib.h:272
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
#define PACKET_DATA_OFFSET_TX_ID
Definition: transferlib.h:285
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
#define PACKET_REQMETA_OFFSET_NODE_NAME
Definition: transferlib.h:225
#define PACKET_COMPLETE_OFFSET_TX_ID
Definition: transferlib.h:298
#define PACKET_METASHORT_OFFSET_NODE_ID
Definition: transferlib.h:254
#define PACKET_CANCEL_OFFSET_TX_ID
Definition: transferlib.h:308
#define PACKET_DATA_OFFSET_BYTE_COUNT
Definition: transferlib.h:286
#define PACKET_REQDATA_OFFSET_TX_ID
Definition: transferlib.h:270
#define PACKET_REQMETA_OFFSET_TX_ID
Definition: transferlib.h:226
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
#define PACKET_DATA_OFFSET_CHUNK_START
Definition: transferlib.h:287
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
#define PACKET_COMPLETE_OFFSET_NODE_ID
Definition: transferlib.h:297
#define PACKET_QUEUE_OFFSET_NODE_NAME
Definition: transferlib.h:213
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PACKET_REQDATA_OFFSET_HOLE_START
Definition: transferlib.h:271
#define PACKET_METASHORT_OFFSET_FILE_NAME
Definition: transferlib.h:257
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
#define PACKET_QUEUE_OFFSET_NODE_ID
Definition: transferlib.h:212
#define PACKET_REQMETA_OFFSET_NODE_ID
Definition: transferlib.h:224
char buf[128]
Definition: rw_test.cpp:40
static bool debug_flag
Definition: agent_file.cpp:69
#define PACKET_DATA_OFFSET_NODE_ID
Definition: transferlib.h:284
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
int32_t write_meta ( tx_progress tx)
1317 {
1318  std::vector<PACKET_BYTE> packet;
1319  std::ofstream file_name;
1320 
1321  if (currentmjd(0.) - tx.savetime > 5./86400.)
1322  {
1323  // std::cout<<"write_meta: "<< 86400.*(currentmjd(0.) - tx.savetime)<<" file: "<<tx.temppath + ".meta"<< std::endl;
1324  tx.savetime = currentmjd(0.);
1325  make_metadata_packet(packet, tx.tx_id, (char *)tx.file_name.c_str(), tx.file_size, (char *)tx.node_name.c_str(), (char *)tx.agent_name.c_str());
1326  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1327  if(!file_name.is_open())
1328  {
1329  return (-errno);
1330  }
1331 
1332  uint16_t crc;
1333  file_name.write((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1334  crc = slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1335  file_name.write((char *)&crc, 2);
1336  for (file_progress progress_info : tx.file_info)
1337  {
1338  file_name.write((const char *)&progress_info, sizeof(progress_info));
1339  crc = slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info));
1340  file_name.write((char *)&crc, 2);
1341  }
1342  file_name.close();
1343  }
1344 
1345  return 0;
1346 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:332
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
deque< file_progress > file_info
Definition: transferlib.h:358
string node_name
Definition: transferlib.h:349
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
int32_t read_meta ( tx_progress tx)
1349 {
1350  std::vector<PACKET_BYTE> packet(PACKET_METALONG_OFFSET_TOTAL,0);
1351  std::ifstream file_name;
1353 
1354  struct stat statbuf;
1355  if (!stat((tx.temppath + ".meta").c_str(), &statbuf) && statbuf.st_size >= COSMOS_SIZEOF(file_progress))
1356  {
1357  file_name.open(tx.temppath + ".meta", std::ios::out|std::ios::binary);
1358  if(!file_name.is_open())
1359  {
1360  return (-errno);
1361  }
1362  }
1363  else
1364  {
1365  // remove((tx.temppath + ".meta").c_str());
1366  return DATA_ERROR_SIZE_MISMATCH;
1367  }
1368 
1369  tx.fp = nullptr;
1370  tx.savetime = 0.;
1371 
1372 
1373  // load metadata
1374  tx.havemeta = true;
1375 
1376  file_name.read((char *)&packet[0], PACKET_METALONG_OFFSET_TOTAL);
1377  if (file_name.eof())
1378  {
1379  return DATA_ERROR_SIZE_MISMATCH;
1380  }
1381  uint16_t crc;
1382  file_name.read((char *)&crc, 2);
1383  if (file_name.eof())
1384  {
1385  return DATA_ERROR_SIZE_MISMATCH;
1386  }
1387  if (crc != slip_calc_crc((uint8_t *)&packet[0], PACKET_METALONG_OFFSET_TOTAL))
1388  {
1389  file_name.close();
1390  return DATA_ERROR_CRC;
1391  }
1392  extract_metadata(packet, meta);
1393  tx.tx_id = meta.tx_id;
1394  tx.node_name = meta.node_name;
1395  tx.agent_name = meta.agent_name;
1396  tx.file_name = meta.file_name;
1397  tx.file_size = meta.file_size;
1398 
1399  // load file progress
1400  file_progress progress_info;
1401  do
1402  {
1403  file_name.read((char *)&progress_info, sizeof(progress_info));
1404  if (file_name.eof())
1405  {
1406  break;
1407  }
1408  uint16_t crc;
1409  file_name.read((char *)&crc, 2);
1410  if (file_name.eof())
1411  {
1412  return DATA_ERROR_SIZE_MISMATCH;
1413  }
1414  if (crc != slip_calc_crc((uint8_t *)&progress_info, sizeof(progress_info)))
1415  {
1416  file_name.close();
1417  return DATA_ERROR_CRC;
1418  }
1419 
1420  tx.file_info.push_back(progress_info);
1421  } while(!file_name.eof());
1422  file_name.close();
1423  if (debug_flag)
1424  {
1425  printf("read_meta: %s tx_id: %u chunks: %" PRIu32 "\n", (tx.temppath + ".meta").c_str(), tx.tx_id, tx.file_info.size());
1426  }
1427 
1428  // fix any overlaps and count total bytes
1430 
1431  // calculate bytes so far
1432  // tx.total_bytes = 0;
1433  // for (file_progress prog : tx.file_info)
1434  // {
1435  // tx.total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
1436  // }
1437  return 0;
1438 }
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:231
string temppath
Definition: transferlib.h:353
char file_name[128]
Definition: transferlib.h:234
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
Definition: transferlib.h:229
#define COSMOS_SIZEOF(element)
Definition: configCosmos.h:139
Definition: transferlib.h:332
bool havemeta
Definition: transferlib.h:342
FILE * fp
Definition: transferlib.h:359
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
png_uint_32 crc
Definition: png.c:2173
double savetime
Definition: transferlib.h:354
deque< file_progress > file_info
Definition: transferlib.h:358
#define DATA_ERROR_CRC
Definition: cosmos-errno.h:151
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file.cpp:1445
string node_name
Definition: transferlib.h:349
char agent_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:233
#define DATA_ERROR_SIZE_MISMATCH
Definition: cosmos-errno.h:150
string agent_name
Definition: transferlib.h:350
uint16_t slip_calc_crc(uint8_t *buf, uint16_t size)
Calculate CRC-16-CCITT.
Definition: sliplib.cpp:322
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:235
#define PACKET_METALONG_OFFSET_TOTAL
Definition: transferlib.h:243
string file_name
Definition: transferlib.h:351
static bool debug_flag
Definition: agent_file.cpp:69
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:232
bool tx_progress_compare_by_size ( const tx_progress a,
const tx_progress b 
)
2158 {
2159  return a.file_size < b.file_size;
2160 }
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
bool filestruc_compare_by_size ( const filestruc a,
const filestruc b 
)
2153 {
2154  return a.size < b.size;
2155 }
off_t size
Definition: datalib.h:120
PACKET_TX_ID_TYPE check_tx_id ( std::vector< tx_progress tx_entry,
PACKET_TX_ID_TYPE  tx_id 
)
2193 {
2194  if (tx_id != 0 && tx_entry[tx_id].tx_id == tx_id)
2195  {
2196  return tx_id;
2197  }
2198  else
2199  {
2200  return 0;
2201  }
2202 }
Definition: agent_file.cpp:141
int32_t check_node_id_1 ( std::string  node_name)
2205 {
2206  int32_t id = -1;
2207  for (uint16_t i=0; i<txq.size(); ++i)
2208  {
2209  if (txq[i].node_name == node_name)
2210  {
2211  id = i;
2212  break;
2213  }
2214  }
2215  return id;
2216 }
int i
Definition: rw_test.cpp:37
string node_name
Definition: agent_001.cpp:46
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
int32_t check_node_id_1 ( PACKET_NODE_ID_TYPE  node_id)
2219 {
2220  int32_t id = -1;
2221  if (node_id >= 0 && node_id < txq.size())
2222  {
2223  id = node_id;
2224  }
2225  return id;
2226 }
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
uint8_t lookup_remote_node_id ( PACKET_NODE_ID_TYPE  node_id)
2229 {
2230  int32_t id = -1;
2231  if (node_id >=0 && node_id < txq.size())
2232  {
2233  if (txq[node_id].node_id > 0)
2234  {
2235  id = txq[node_id].node_id - 1;
2236  }
2237  }
2238  return id;
2239 }
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
int32_t set_remote_node_id ( PACKET_NODE_ID_TYPE  node_id,
std::string  node_name 
)
2242 {
2243  int32_t id = -1;
2244  for (uint16_t i=0; i<txq.size(); ++i)
2245  {
2246  if (txq[i].node_name == node_name)
2247  {
2248  txq[i].node_id = node_id+1;
2249  id = i;
2250  }
2251  }
2252  return id;
2253 }
int i
Definition: rw_test.cpp:37
string node_name
Definition: agent_001.cpp:46
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
PACKET_TX_ID_TYPE choose_incoming_tx_id ( int32_t  node)
2163 {
2164  PACKET_TX_ID_TYPE tx_id = 0;
2165 
2166  if (node >= 0 && (uint32_t)node < txq.size())
2167  {
2168  // Choose file with least data left to send
2169  PACKET_FILE_SIZE_TYPE nsize = INT32_MAX;
2170  for (std::vector<tx_progress>::size_type i=0; i < txq[node].incoming.progress.size(); ++i)
2171  {
2172  // calculate bytes so far
2173  merge_chunks_overlap(txq[node].incoming.progress[i]);
2174  // txq[node].incoming.progress[i].total_bytes = 0;
2175  // for (file_progress prog : txq[node].incoming.progress[i].file_info)
2176  // {
2177  // txq[node].incoming.progress[i].total_bytes += (prog.chunk_end - prog.chunk_start) + 1;
2178  // }
2179 
2180  // Choose transactions for which we: have meta and bytes remaining is minimized
2181  if (txq[node].incoming.progress[i].tx_id && txq[node].incoming.progress[i].havemeta && (txq[node].incoming.progress[i].file_size - txq[node].incoming.progress[i].total_bytes) < nsize)
2182  {
2183  nsize = txq[node].incoming.progress[i].file_size - txq[node].incoming.progress[i].total_bytes;
2184  tx_id = txq[node].incoming.progress[i].tx_id;
2185  }
2186  }
2187  }
2188 
2189  return tx_id;
2190 }
int i
Definition: rw_test.cpp:37
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
PACKET_FILE_SIZE_TYPE merge_chunks_overlap(tx_progress &tx)
Definition: agent_file.cpp:1445
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
static string node
Definition: agent_monitor.cpp:126
int32_t next_incoming_tx ( PACKET_NODE_ID_TYPE  node)
2256 {
2257  PACKET_TX_ID_TYPE tx_id = check_tx_id(txq[node].incoming.progress, choose_incoming_tx_id(node));
2258 
2259  if (tx_id < TRANSFER_QUEUE_SIZE && tx_id > 0)
2260  {
2261  // See if we know what the remote node_id is for this
2262  int32_t remote_node = lookup_remote_node_id(node);
2263  if (remote_node >= 0)
2264  {
2265  // Check if file has been completely received
2266  if(txq[node].incoming.progress[tx_id].file_size == txq[node].incoming.progress[tx_id].total_bytes && txq[node].incoming.progress[tx_id].havemeta)
2267  {
2268  tx_progress tx_in = txq[node].incoming.progress[tx_id];
2269 
2270  // inform other end that file has been received
2271  std::vector<PACKET_BYTE> packet;
2272  make_complete_packet(packet, remote_node, tx_in.tx_id);
2273  queuesendto("rx", use_channel, packet);
2274 
2275  // Move file to its final location
2276  if (!txq[node].incoming.progress[tx_id].complete)
2277  {
2278  if (txq[node].incoming.progress[tx_id].fp != nullptr)
2279  {
2280  fclose(txq[node].incoming.progress[tx_id].fp);
2281  txq[node].incoming.progress[tx_id].fp = nullptr;
2282  }
2283  std::string final_filepath = tx_in.temppath + ".file";
2284  rename(final_filepath.c_str(), tx_in.filepath.c_str());
2285  txq[node].incoming.progress[tx_id].complete = true;
2286  }
2287  }
2288  else
2289  {
2290  // Ask for missing data
2291  std::vector<file_progress> missing;
2292  missing = find_chunks_missing(txq[node].incoming.progress[tx_id]);
2293  for (uint32_t j=0; j<missing.size(); ++j)
2294  {
2295  std::vector<PACKET_BYTE> packet;
2296  make_reqdata_packet(packet, remote_node, txq[node].incoming.progress[tx_id].tx_id, missing[j].chunk_start, missing[j].chunk_end);
2297  queuesendto("rx", use_channel, packet);
2298  }
2299  }
2300  }
2301  }
2302  return tx_id;
2303 }
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
double queuesendto(std::string type, uint16_t channel, std::vector< PACKET_BYTE > packet)
Definition: agent_file.cpp:1196
PACKET_TX_ID_TYPE check_tx_id(std::vector< tx_progress > tx_entry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:2192
Definition: transferlib.h:338
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
static uint16_t use_channel
Definition: agent_file.cpp:97
PACKET_TX_ID_TYPE choose_incoming_tx_id(int32_t node)
Definition: agent_file.cpp:2162
string filepath
Definition: transferlib.h:352
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
void make_reqdata_packet(vector< PACKET_BYTE > &packet, packet_struct_reqdata reqdata)
Definition: transferlib.cpp:131
std::vector< file_progress > find_chunks_missing(tx_progress &tx)
Definition: agent_file.cpp:1484
int32_t lookup_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file.cpp:2228
static string node
Definition: agent_monitor.cpp:126
string json_list_incoming ( )
1612  {
1613  JSONObject jobj;
1614  JSONArray incoming;
1615 
1616  incoming.resize(txq.size());
1617  for (uint16_t node=0; node<txq.size(); ++node)
1618  {
1619 
1620  JSONObject node_obj("node", txq[node].node_name);
1621  node_obj.addElement("count", txq[node].incoming.size);
1622 
1623  JSONArray files;
1624  files.resize(txq[node].incoming.size);
1625  int i =0;
1626  for(tx_progress tx : txq[node].incoming.progress)
1627  {
1628  if (tx.tx_id)
1629  {
1630  JSONObject f("tx_id", tx.tx_id);
1631  f.addElement("agent", tx.agent_name);
1632  f.addElement("name", tx.file_name);
1633  f.addElement("bytes", tx.total_bytes);
1634  f.addElement("size", tx.file_size);
1635  files.at(i) = (JSONValue(f));
1636  i++;
1637  }
1638  }
1639  node_obj.addElement("files", files);
1640  incoming.at(node) = JSONValue(node_obj);
1641 
1642  }
1643  jobj.addElement("incoming", incoming);
1644  return jobj.to_json_string();
1645 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
string json_list_outgoing ( )
1647  {
1648  JSONObject jobj;
1649  JSONArray outgoing;
1650 
1651  outgoing.resize(txq.size());
1652  for (uint16_t node=0; node<txq.size(); ++node)
1653  {
1654 
1655  JSONObject node_obj("node", txq[node].node_name);
1656  node_obj.addElement("count", txq[node].outgoing.size);
1657 
1658  JSONArray files;
1659  files.resize(txq[node].outgoing.size);
1660  int i =0;
1661  for(tx_progress tx : txq[node].outgoing.progress)
1662  {
1663  if (tx.tx_id)
1664  {
1665  JSONObject f("tx_id", tx.tx_id);
1666  f.addElement("agent", tx.agent_name);
1667  f.addElement("name", tx.file_name);
1668  f.addElement("bytes", tx.total_bytes);
1669  f.addElement("size", tx.file_size);
1670  files.at(i) = (JSONValue(f));
1671  i++;
1672  }
1673  }
1674  node_obj.addElement("files", files);
1675  outgoing.at(node) = JSONValue(node_obj);
1676 
1677  }
1678  jobj.addElement("outgoing", outgoing);
1679  return jobj.to_json_string();
1680 }
Definition: jsonvalue.h:13
int i
Definition: rw_test.cpp:37
Definition: transferlib.h:338
string node_name
Definition: agent_001.cpp:46
struct vector< JSONValue > JSONArray
Definition: jsonvalue.h:10
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
Definition: jsonobject.h:5
void addElement(string key, JSONValue value)
Definition: jsonobject.cpp:10
static string node
Definition: agent_monitor.cpp:126
string to_json_string()
Definition: jsonobject.cpp:91
int main ( int  argc,
char *  argv[] 
)
207 {
208  int32_t iretn;
209  // store command line arguments
210  switch (argc)
211  {
212  case 3:
213  {
214  send_channel[1].destination_ip = argv[2];
217  send_channel[1].nmjd = currentmjd(0.);
218  ++send_channels;
219  }
220  case 2:
221  {
222  send_channel[0].destination_ip = argv[1];
225  send_channel[0].nmjd = currentmjd(0.);
226  ++send_channels;
227  break;
228  }
229 // default:
230 // {
231 // printf("Usage:\t agent_file destination_ip_address[0] {destination_ip_address[1]\n");
232 // exit(-1);
233 // }
234  }
235 
236  // set this program up as a server
237  // port number = 0 in this case, automatic assignment of port
238  printf("- Setting up server...");
239  fflush(stdout);
240 
241  char hostname[60];
242  gethostname(hostname, sizeof (hostname));
243  agentname += hostname;
244  agent = new Agent("", agentname, 5.);
245  if ((iretn = agent->wait()) < 0)
246  {
247  fprintf(agent->get_debug_fd(), "%16.10f %s Failed to start Agent %s on Node %s Dated %s : %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str(), cosmos_error_string(iretn).c_str());
248  exit(iretn);
249  }
250  else
251  {
252  fprintf(agent->get_debug_fd(), "%16.10f %s Started Agent %s on Node %s Dated %s\n",currentmjd(), mjd2iso8601(currentmjd()).c_str(), agent->getAgent().c_str(), agent->getNode().c_str(), utc2iso8601(data_ctime(argv[0])).c_str());
253  }
254 
255  printf("\t\tSuccess.\n");
256  fflush(stdout); // Ensure this gets printed before blocking call
257 
258  //open sockets for receiving and sending
259  printf("- Opening recv socket...");
260  fflush(stdout);
261 
263  {
264  std::cout << "iretn = " << iretn << std::endl;
265  printf("- Could not successfully open recv socket... exiting \n");
266  exit (-errno);
267  }
268  printf("\tSuccess.\n");
269 
270  printf("- Opening send socket...");
271  fflush(stdout);
272  for (uint16_t i=0; i<send_channels; ++i)
273  {
275  {
276  std::cout << "iretn = " << iretn << std::endl;
277  printf("- Could not successfully open send socket %s at size %u ... exiting \n", send_channel[i].destination_ip.c_str(), send_channel[i].packet_size);
278  exit (-errno);
279  }
280  }
281  printf("\tSuccess.\n");
282 
283  // Restore in progress transfers from previous run
284  for (std::string node_name : data_list_nodes())
285  {
286  int32_t node = check_node_id_1(node_name);
287 
288  if (node < 0)
289  {
290  node = txq.size();
291  tx_queue tx;
292  tx.node_name = node_name;
293  tx.node_id = 0;
294  tx.incoming.id = 0;
295  tx.incoming.next_id = 1;
297  tx.incoming.size = 0;
298  tx.outgoing.id = 0;
299  tx.outgoing.next_id = 1;
301  tx.outgoing.size = 0;
302  txq.push_back(tx);
303  }
304 
305  for(filestruc file : data_list_files(node_name, "temp", "file"))
306  {
307  // Add entry for each meta file
308  if (file.type == "meta")
309  {
310  // Incoming
311  if (!file.name.compare(0,3,"in_"))
312  {
313  tx_progress tx_in;
314  tx_in.temppath = file.path.substr(0,file.path.find(".meta"));
315  if (read_meta(tx_in) >= 0)
316  {
317  incoming_tx_add(tx_in);
318  }
319  }
320 
321  // Outgoing
322  if (!file.name.compare(0,4,"out_"))
323  {
324  tx_progress tx_out;
325  tx_out.temppath = file.path.substr(0,file.path.find(".meta"));
326  if (read_meta(tx_out) >= 0)
327  {
328  iretn = outgoing_tx_add(tx_out);
329  }
330  }
331  }
332  }
333  }
334 
335  // add agent_file requests
336  if ((iretn=agent->add_request("use_channel",request_use_channel,"{0|1} [throughput]", "choose slow or fast channel")))
337  exit (iretn);
338  if ((iretn=agent->add_request("remove_file",request_remove_file,"in|out tx_id", "removes file from indicated queue")))
339  exit (iretn);
340  // if ((iretn=agent->add_request("send_file",request_send_file,"", "creates and sends metadata/data packets")))
341  // exit (iretn);
342  if ((iretn=agent->add_request("ls",request_ls,"", "lists contents of directory")))
343  exit (iretn);
344  if ((iretn=agent->add_request("list_incoming",request_list_incoming,"", "lists contents incoming queue")))
345  exit (iretn);
346  if ((iretn=agent->add_request("list_outgoing",request_list_outgoing,"", "lists contents outgoing queue")))
347  exit (iretn);
348 
349  if ((iretn=agent->add_request("list_incoming_json",request_list_incoming_json,"", "lists contents incoming queue")))
350  exit (iretn);
351  if ((iretn=agent->add_request("list_outgoing_json",request_list_outgoing_json,"", "lists contents outgoing queue")))
352  exit (iretn);
353  if ((iretn=agent->add_request("debug",request_debug,"{0|1}","Toggle Debug information")))
354  exit (iretn);
355 
356  std::thread send_loop_thread(send_loop);
357  std::thread recv_loop_thread(recv_loop);
358  std::thread transmit_loop_thread(transmit_loop);
359 
360  double nextdiskcheck = currentmjd(0.);
361  ElapsedTime etloop;
362  etloop.start();
363 
364  // start the agent
365  while(agent->running())
366  {
367  if (agent->running() == (uint16_t)Agent::State::IDLE)
368  {
369  COSMOS_SLEEP(1);
370  continue;
371  }
372 
373  double sleepsec = 86400. * (nextdiskcheck - currentmjd());
374  if (sleepsec > 0.)
375  {
376  COSMOS_USLEEP((uint32_t)(sleepsec*1e6));
377  }
378 
379  // Check for new files to transmit if queue is not full and check is not delayed
380 
381  if (currentmjd() > nextdiskcheck)
382  {
383  nextdiskcheck = currentmjd(0.) + 10./86400.;
384  for (uint16_t node=0; node<txq.size(); ++node)
385  {
386  if (txq[node].outgoing.size < TRANSFER_QUEUE_LIMIT)
387  {
388  std::vector<filestruc> file_names;
389  for (filestruc file : data_list_files(txq[node].node_name, "outgoing", ""))
390  {
391  if (file.type == "directory")
392  {
393  iretn = data_list_files(txq[node].node_name, "outgoing", file.name, file_names);
394  }
395  }
396 
397  // Sort list by size, then go through list of files found, adding to queue.
398  sort(file_names.begin(), file_names.end(), filestruc_compare_by_size);
399  for(filestruc file : file_names)
400  {
401  if (txq[node].outgoing.size >= TRANSFER_QUEUE_LIMIT)
402  {
403  break;
404  }
405 
406  if (file.type == "directory")
407  {
408  continue;
409  }
410 
411  bool addtoqueue = true;
412  outgoing_tx_lock.lock();
413  for(tx_progress progress : txq[node].outgoing.progress)
414  {
415  if (progress.tx_id && file.path == progress.filepath)
416  {
417  addtoqueue = false;
418  break;
419  }
420  }
421 
422  outgoing_tx_lock.unlock();
423 
424  if (addtoqueue)
425  {
426  iretn = outgoing_tx_add(file.node, file.agent, file.name);
427  if (iretn >= 0)
428  {
429  nextdiskcheck = currentmjd();
430  }
431  if (debug_flag)
432  {
433  printf("[%f] outgoing_tx_add: %s [%d]\n", etloop.split(), file.path.c_str(), iretn);
434  }
435  }
436  }
437  }
438  }
439  }
440  } // End WHILE Loop
441 
442  send_loop_thread.join();
443  recv_loop_thread.join();
444  transmit_queue_check.notify_one();
445  transmit_loop_thread.join();
446 
447  agent->shutdown();
448 
449  exit (0);
450 }
double nmjd
Definition: agent_file.cpp:105
static socket_channel recvchan
Definition: agent_file.cpp:120
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
Agent socket using Unicast UDP.
int i
Definition: rw_test.cpp:37
string getNode()
Listen for heartbeat.
Definition: agentclass.cpp:2607
Definition: transferlib.h:338
void transmit_loop()
Definition: agent_file.cpp:1170
static Agent * agent
Definition: agent_file.cpp:94
int iretn
Definition: rw_test.cpp:37
#define TRANSFER_QUEUE_SIZE
Definition: agent_file.cpp:58
int32_t wait(State state=State::RUN, double waitsec=10.)
Definition: agentclass.cpp:398
vector< filestruc > data_list_files(string directory)
Get list of files in a directory, directly.
Definition: datalib.cpp:461
tx_entry outgoing
Definition: agent_file.cpp:156
int32_t request_debug(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:2305
vector< string > data_list_nodes()
Get list of Nodes, directly.
Definition: datalib.cpp:583
void recv_loop()
Definition: agent_file.cpp:452
string node_name
Definition: agent_001.cpp:46
#define SOCKET_TALK
Talk followed by optional listen (sendto address)
Definition: socketlib.h:82
int32_t outgoing_tx_add(tx_progress tx_out)
Definition: agent_file.cpp:1848
PACKET_NODE_ID_TYPE node_id
Definition: agent_file.cpp:154
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we&#39;re supposed to be running.
Definition: agentclass.cpp:391
#define PACKET_SIZE_LO
Definition: agent_file.cpp:61
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
std::string destination_ip
Definition: agent_file.cpp:102
Definition: datalib.h:113
void start()
ElapsedTime::start.
Definition: elapsedtime.cpp:203
void send_loop()
Definition: agent_file.cpp:991
#define THROUGHPUT_LO
Definition: agent_file.cpp:63
static std::condition_variable transmit_queue_check
Definition: agent_file.cpp:118
uint32_t throughput
Definition: agent_file.cpp:104
#define AGENTRECVPORT
Default RECV port.
Definition: agentclass.h:200
static std::mutex outgoing_tx_lock
Definition: agent_file.cpp:129
string getAgent()
Definition: agentclass.cpp:2609
int32_t add_request(string token, external_request_function function, string synopsis="", string description="")
Add internal request to Agent request list with description and synopsis.
Definition: agentclass.cpp:312
PACKET_TX_ID_TYPE id
Definition: agent_file.cpp:147
int32_t request_list_outgoing(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:1715
PACKET_TX_ID_TYPE size
Definition: agent_file.cpp:146
Definition: agentclass.h:139
bool filestruc_compare_by_size(const filestruc &a, const filestruc &b)
Definition: agent_file.cpp:2152
PACKET_TX_ID_TYPE next_id
Definition: agent_file.cpp:148
#define SOCKET_BLOCKING
Blocking Agent.
Definition: socketlib.h:78
#define THROUGHPUT_HI
Definition: agent_file.cpp:65
int32_t shutdown()
Shutdown agent gracefully.
Definition: agentclass.cpp:366
#define AGENTRCVTIMEO
Default AGENT socket RCVTIMEO (100 msec)
Definition: agentclass.h:208
int32_t request_remove_file(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:1798
Definition: agent_file.cpp:151
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
int32_t request_list_incoming(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:1570
std::string node_name
Definition: agent_file.cpp:153
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
double data_ctime(string path)
Definition: datalib.cpp:1910
Definition: elapsedtime.h:62
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static uint16_t send_channels
Definition: agent_file.cpp:96
std::vector< tx_progress > progress
Definition: agent_file.cpp:145
string utc2iso8601(double utc)
ISO 8601 version of time.
Definition: timelib.cpp:1286
static sendchannelstruc send_channel[2]
Definition: agent_file.cpp:108
#define SOCKET_LISTEN
Listen followed by optional talk (recvfrom INADDRANY)
Definition: socketlib.h:84
PACKET_CHUNK_SIZE_TYPE packet_size
Definition: agent_file.cpp:103
int32_t incoming_tx_add(tx_progress tx_in)
Definition: agent_file.cpp:2008
static std::string agentname
Definition: agent_file.cpp:90
int32_t request_list_incoming_json(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:1600
static bool debug_flag
Definition: agent_file.cpp:69
int32_t socket_open(socket_channel *channel, NetworkType ntype, const char *address, uint16_t port, uint16_t role, bool blocking, uint32_t usectimeo, uint32_t rcvbuf, uint32_t sndbuf)
Open UDP socket.
Definition: socketlib.cpp:51
static string node
Definition: agent_monitor.cpp:126
int32_t read_meta(tx_progress &tx)
Definition: agent_file.cpp:1348
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
string mjd2iso8601(double mjd)
Definition: timelib.cpp:1316
int32_t request_list_outgoing_json(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:1606
int32_t request_ls(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:1538
tx_entry incoming
Definition: agent_file.cpp:155
#define PACKET_SIZE_HI
Definition: agent_file.cpp:64
int32_t request_use_channel(string &request, string &response, Agent *agent)
Definition: agent_file.cpp:1778
static vector< socket_channel > sendchan
Definition: agent_forward.cpp:39
bool lower_chunk ( file_progress  i,
file_progress  j 
)
1441 {
1442  return (i.chunk_start<j.chunk_start);
1443 }
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334

Variable Documentation

bool debug_flag = false
static
std::string agentname = "file_"
static

the (global) name of the agent

beatstruc cbeat
static

the (global) name of the heartbeat structure

Agent* agent
static

the (global) name of the cosmos data structure

uint16_t send_channels =0
static

the (global) number of agent sending channels

uint16_t use_channel = 0
static
sendchannelstruc send_channel[2]
static
std::queue<transmit_queue_entry> transmit_queue
static
std::condition_variable transmit_queue_check
static
socket_channel recvchan
static
std::mutex incoming_tx_lock
static
std::mutex outgoing_tx_lock
static
double last_data_receive_time = 0.
static
double next_reqmeta_time = 0.
static
double next_queue_time = 0.
static
std::vector<tx_queue> txq
static
int32_t active_node = -1
static