511 vector<PACKET_BYTE> recvbuf;
512 std::string partial_filepath;
544 node_name =
txq[static_cast <
size_t>(
node)].node_name;
601 switch (recvbuf[0] & 0x0f)
653 bool duplicate =
false;
654 bool updated =
false;
657 if (!
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.size())
660 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info.push_back(tp);
661 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
667 for (uint32_t j=0; j<txq[static_cast <size_t>(
node)].incoming.progress[tx_id].file_info.size(); ++j)
670 if (tp.
chunk_start >=
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end <=
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info[j].chunk_end)
676 if (tp.
chunk_start <
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start)
679 if (tp.
chunk_end + 1 <
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start)
681 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info.insert(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.begin()+j, tp);
682 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
689 tp.
chunk_end =
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info[j].chunk_start - 1;
690 txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
692 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
700 if (tp.
chunk_start <=
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end + 1)
702 if (tp.
chunk_end >
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end)
705 tp.
chunk_start =
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info[j].chunk_end + 1;
706 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
707 txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
718 if (!duplicate && check ==
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_info.size())
720 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].file_info.push_back(tp);
721 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].total_bytes += data.
byte_count;
731 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp ==
nullptr)
733 partial_filepath =
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].temppath +
".file";
736 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(),
"r+");
740 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(),
"w");
744 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp ==
nullptr)
757 fwrite(data.
chunk, data.
byte_count, 1,
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
758 fflush(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
760 write_meta(
txq[static_cast <size_t>(node)].incoming.progress[tx_id]);
778 if(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].file_size ==
txq[static_cast <size_t>(node)].incoming.progress[tx_id].total_bytes &&
txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
782 if (remote_node >= 0)
791 vector<PACKET_BYTE> packet;
793 queuesendto(static_cast <PACKET_NODE_ID_TYPE>(node),
"rx", packet);
796 if (!
txq[static_cast <size_t>(node)].incoming.progress[tx_id].complete)
798 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp !=
nullptr)
800 fclose(
txq[static_cast <size_t>(node)].incoming.progress[tx_id].fp);
801 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].fp =
nullptr;
803 std::string final_filepath = tx_in.
temppath +
".file";
804 int iret = rename(final_filepath.c_str(), tx_in.
filepath.c_str());
806 write_meta(
txq[static_cast <size_t>(node)].incoming.progress[tx_id], 0.);
815 txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].complete =
true;
859 if (!
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size())
862 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info.push_back(tp);
863 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
868 for (uint32_t j=0; j<txq[static_cast <size_t>(
node)].outgoing.progress[tx_id].file_info.size(); ++j)
871 if (tp.
chunk_start ==
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end ==
txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_end)
876 if (tp.
chunk_start <
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start)
879 if (tp.
chunk_end + 1 <
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start)
881 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info.insert(
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.begin()+j, tp);
882 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
888 tp.
chunk_end =
txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
889 txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
891 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
898 if (tp.
chunk_start <=
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
900 if (tp.
chunk_end >
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end)
902 byte_count = tp.
chunk_end -
txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_end;
903 tp.
chunk_start =
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
904 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
905 txq[static_cast <size_t>(node)].outgoing.progress[tx_id].total_bytes += byte_count;
915 if (check ==
txq[static_cast <size_t>(node)].outgoing.progress[tx_id].file_info.size())
917 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].file_info.push_back(tp);
918 txq[static_cast <
size_t>(
node)].outgoing.progress[tx_id].total_bytes += byte_count;
924 write_meta(
txq[static_cast <size_t>(node)].outgoing.progress[tx_id]);
925 txq[static_cast <
size_t>(
node)].outgoing.id = reqdata.
tx_id;
950 if (remote_node >= 0)
952 txq[static_cast <
size_t>(
node)].outgoing.meta_id.clear();
958 txq[static_cast <
size_t>(
node)].outgoing.meta_id.push_back(tx_id);
1012 if (remote_node >= 0)
1016 txq[static_cast <size_t>(node)].outgoing.id = tx_id;
1046 if (queue.
tx_id[
i] &&
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id == queue.
tx_id[
i])
1054 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && !valid)
1079 vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
1083 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id && !
txq[static_cast <size_t>(node)].incoming.progress[tx_id].havemeta)
1086 tqueue[iq++] = tx_id;
1088 if (iq == TRANSFER_QUEUE_LIMIT)
1095 vector<PACKET_BYTE> packet;
1114 for (uint16_t node=0; node<
txq.size(); ++
node)
1118 if (
txq[static_cast <size_t>(node)].incoming.progress[tx_id].tx_id &&
txq[static_cast <
size_t>(
node)].incoming.progress[tx_id].havemeta)
1120 write_meta(
txq[static_cast <size_t>(node)].incoming.progress[tx_id], 0.);
string node
Definition: agent_file2.cpp:100
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
Definition: transferlib.h:205
static std::mutex incoming_tx_lock
Definition: agent_file2.cpp:129
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
Definition: transferlib.h:291
static std::mutex debug_fd_lock
Definition: agent_file2.cpp:131
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
Definition: transferlib.h:332
Definition: transferlib.h:338
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:263
string chanip
Definition: agent_file2.cpp:102
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
char address[17]
Definition: socketlib.h:134
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file2.cpp:1648
static Agent * agent
Definition: agent_file2.cpp:95
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file2.cpp:2479
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
Definition: agent_file2.cpp:98
static vector< tx_queue > txq
Definition: agent_file2.cpp:167
string node_name
Definition: agent_001.cpp:46
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we're supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
static vector< channelstruc > comm_channel
Definition: agent_file2.cpp:109
static uint16_t use_channel
Definition: agent_file2.cpp:96
int32_t myrecvfrom(std::string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
Definition: agent_file2.cpp:1456
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
double queuesendto(PACKET_NODE_ID_TYPE node_id, std::string type, vector< PACKET_BYTE > packet)
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:207
string node_name
Definition: transferlib.h:349
string filepath
Definition: transferlib.h:352
Definition: socketlib.h:115
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
struct sockaddr_in caddr
Definition: socketlib.h:122
Definition: transferlib.h:261
#define PACKET_HEADER_OFFSET_TOTAL
Definition: transferlib.h:151
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
#define PACKET_QUEUE_OFFSET_NODE_NAME
Definition: transferlib.h:213
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file2.cpp:2692
#define SEEK_SET
Definition: zconf.h:475
int32_t incoming_tx_del(int32_t node, uint16_t tx_id=256)
Definition: agent_file2.cpp:2525
Definition: transferlib.h:275
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
socket_channel chansock
Definition: agent_file2.cpp:101
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
static double next_reqmeta_time
Definition: agent_file2.cpp:134
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
static uint32_t type_error_count
Definition: agent_file2.cpp:141
int32_t next_incoming_tx(PACKET_NODE_ID_TYPE node)
Definition: agent_file2.cpp:2767
double nmjd
Definition: agent_file2.cpp:105
static string node
Definition: agent_monitor.cpp:126
static std::mutex outgoing_tx_lock
Definition: agent_file2.cpp:130
#define PROGRESS_QUEUE_SIZE
Definition: agent_file2.cpp:62
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
int32_t check_node_id_2(std::string node_name)
Definition: agent_file2.cpp:2704
void make_reqmeta_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > reqmeta)
Definition: transferlib.cpp:110
int32_t check_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file2.cpp:2740
int32_t set_remote_node_id(PACKET_NODE_ID_TYPE node_id, std::string node_name)
Definition: agent_file2.cpp:2753
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:293
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file2.cpp:2377
bool data_exists(string &path)
Check existence of path.
Definition: datalib.cpp:1003
static double last_data_receive_time
Definition: agent_file2.cpp:133
static bool debug_flag
Definition: agent_file2.cpp:72