511     vector<PACKET_BYTE> recvbuf;
   512     std::string partial_filepath;
   544                 node_name = 
txq[static_cast <
size_t>(
node)].node_name;
   601             switch (recvbuf[0] & 0x0f)
   653                             bool duplicate = 
false;
   654                             bool updated = 
false;
   657                             if (!
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.size())
   660                                 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info.push_back(tp);
   661                                 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
   667                                 for (uint32_t j=0; j<txq[static_cast <size_t>(
node)].incoming.progress[tx_id].file_info.size(); ++j)
   670                                     if (tp.
chunk_start >= 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end <= 
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info[j].chunk_end)
   676                                     if (tp.
chunk_start < 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start)
   679                                         if (tp.
chunk_end + 1 < 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start)
   681                                             txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info.insert(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.begin()+j, tp);
   682                                             txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
   689                                             tp.
chunk_end = 
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info[j].chunk_start - 1;
   690                                             txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
   692                                             txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
   700                                         if (tp.
chunk_start <= 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end + 1)
   702                                             if (tp.
chunk_end > 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end)
   705                                                 tp.
chunk_start = 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end + 1;
   706                                                 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
   707                                                 txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
   718                                 if (!duplicate && check == 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.size())
   720                                     txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info.push_back(tp);
   721                                     txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
   731                                 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp == 
nullptr)
   733                                     partial_filepath = 
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].temppath + 
".file";
   736                                         txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), 
"r+");
   740                                         txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(), 
"w");
   744                                 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp == 
nullptr)
   757                                     fwrite(data.
chunk, data.
byte_count, 1, 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
   758                                     fflush(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
   760                                     write_meta(
txq[static_cast <size_t>(node)].incoming.progress[tx_id]);
   778                             if(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_size == 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes && 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
   782                                 if (remote_node >= 0)
   791                                     vector<PACKET_BYTE> packet;
   793                                     queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node), 
"rx", packet);
   796                                     if (!
txq[static_cast <size_t>(node)].incoming.progress[tx_id].complete)
   798                                         if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp != 
nullptr)
   800                                             fclose(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
   801                                             txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].fp = 
nullptr;
   803                                         std::string final_filepath = tx_in.
temppath + 
".file";
   804                                         int iret = rename(final_filepath.c_str(), tx_in.
filepath.c_str());
   806                                         write_meta(
txq[static_cast <size_t>(node)].incoming.progress[tx_id], 0.);
   815                                         txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].complete = 
true;
   859                         if (!
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size())
   862                             txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info.push_back(tp);
   863                             txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
   868                             for (uint32_t j=0; j<txq[static_cast <size_t>(
node)].outgoing.progress[tx_id].file_info.size(); ++j)
   871                                 if (tp.
chunk_start == 
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end == 
txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_end)
   876                                 if (tp.
chunk_start < 
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start)
   879                                     if (tp.
chunk_end + 1 < 
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start)
   881                                         txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info.insert(
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.begin()+j, tp);
   882                                         txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
   888                                         tp.
chunk_end = 
txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
   889                                         txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
   891                                         txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
   898                                     if (tp.
chunk_start <= 
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
   900                                         if (tp.
chunk_end > 
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end)
   902                                             byte_count = tp.
chunk_end - 
txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_end;
   903                                             tp.
chunk_start = 
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
   904                                             txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
   905                                             txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes += byte_count;
   915                             if (check == 
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size())
   917                                 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info.push_back(tp);
   918                                 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
   924                         write_meta(
txq[static_cast <size_t>(node)].outgoing.progress[tx_id]);
   925                         txq[static_cast <
size_t>(
node)].outgoing.id = reqdata.
tx_id;
   950                         if (remote_node >= 0)
   952                             txq[static_cast <
size_t>(
node)].outgoing.meta_id.clear();
   958                                     txq[static_cast <
size_t>(
node)].outgoing.meta_id.push_back(tx_id);
  1012                             if (remote_node >= 0)
  1016                                 txq[static_cast <size_t>(node)].outgoing.id = tx_id;
  1046                                 if (queue.
tx_id[
i] && 
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id == queue.
tx_id[
i])
  1054                             if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && !valid)
  1079                             vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
  1083                                 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && !
txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
  1086                                     tqueue[iq++] = tx_id;
  1088                                 if (iq == TRANSFER_QUEUE_LIMIT)
  1095                                 vector<PACKET_BYTE> packet;
  1114     for (uint16_t node=0; node<
txq.size(); ++
node)
  1118             if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && 
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].havemeta)
  1120                 write_meta(
txq[static_cast <size_t>(node)].incoming.progress[tx_id], 0.);
 string node
Definition: agent_file2.cpp:100
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
Definition: transferlib.h:205
static std::mutex incoming_tx_lock
Definition: agent_file2.cpp:129
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
Definition: transferlib.h:291
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
Definition: transferlib.h:332
Definition: transferlib.h:338
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:263
string chanip
Definition: agent_file2.cpp:102
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
char address[17]
Definition: socketlib.h:134
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file2.cpp:1648
static Agent * agent
Definition: agent_file2.cpp:95
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file2.cpp:2479
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
Definition: agent_file2.cpp:98
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we're supposed to be running. 
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
static uint16_t use_channel
Definition: agent_file2.cpp:96
int32_t myrecvfrom(std::string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
Definition: agent_file2.cpp:1456
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
double queuesendto(PACKET_NODE_ID_TYPE node_id, std::string type, vector< PACKET_BYTE > packet)
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:207
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
Definition: socketlib.h:115
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
struct sockaddr_in caddr
Definition: socketlib.h:122
Definition: transferlib.h:261
#define PACKET_HEADER_OFFSET_TOTAL
Definition: transferlib.h:151
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
#define PACKET_QUEUE_OFFSET_NODE_NAME
Definition: transferlib.h:213
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days. 
Definition: timelib.cpp:65
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file2.cpp:2692
#define SEEK_SET
Definition: zconf.h:475
int32_t incoming_tx_del(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2525
Definition: transferlib.h:275
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
socket_channel chansock
Definition: agent_file2.cpp:101
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
static double next_reqmeta_time
Definition: agent_file2.cpp:134
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
static uint32_t type_error_count
Definition: agent_file2.cpp:141
int32_t next_incoming_tx(PACKET_NODE_ID_TYPE node)
Definition: agent_file2.cpp:2767
double nmjd
Definition: agent_file2.cpp:105
static string node
Definition: agent_monitor.cpp:126
static std::mutex outgoing_tx_lock
Definition: agent_file2.cpp:130
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
void make_reqmeta_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > reqmeta)
Definition: transferlib.cpp:110
int32_t check_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file2.cpp:2740
int32_t set_remote_node_id(PACKET_NODE_ID_TYPE node_id, std::string node_name)
Definition: agent_file2.cpp:2753
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:293
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file2.cpp:2377
bool data_exists(string &path)
Check existence of path. 
Definition: datalib.cpp:1003
static double last_data_receive_time
Definition: agent_file2.cpp:133
static bool debug_flag
Definition: agent_file2.cpp:72