454 std::vector<PACKET_BYTE> recvbuf;
455 std::string partial_filepath;
470 switch (recvbuf[0] & 0x0f)
513 bool duplicate =
false;
514 bool updated =
false;
517 if (!
txq[node].incoming.progress[tx_id].file_info.size())
520 txq[
node].incoming.progress[tx_id].file_info.push_back(tp);
527 for (uint32_t j=0; j<
txq[
node].incoming.progress[tx_id].file_info.size(); ++j)
530 if (tp.
chunk_start >=
txq[node].incoming.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end <=
txq[node].incoming.progress[tx_id].file_info[j].chunk_end)
536 if (tp.
chunk_start <
txq[node].incoming.progress[tx_id].file_info[j].chunk_start)
539 if (tp.
chunk_end + 1 <
txq[node].incoming.progress[tx_id].file_info[j].chunk_start)
541 txq[
node].incoming.progress[tx_id].file_info.insert(
txq[node].incoming.progress[tx_id].file_info.begin()+j, tp);
549 tp.
chunk_end =
txq[
node].incoming.progress[tx_id].file_info[j].chunk_start - 1;
560 if (tp.
chunk_start <=
txq[node].incoming.progress[tx_id].file_info[j].chunk_end + 1)
562 if (tp.
chunk_end >
txq[node].incoming.progress[tx_id].file_info[j].chunk_end)
578 if (!duplicate && check ==
txq[node].incoming.progress[tx_id].file_info.size())
580 txq[
node].incoming.progress[tx_id].file_info.push_back(tp);
591 if (
txq[node].incoming.progress[tx_id].fp == NULL)
593 partial_filepath =
txq[
node].incoming.progress[tx_id].temppath +
".file";
594 txq[
node].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(),
"w");
597 if (
txq[node].incoming.progress[tx_id].fp == NULL)
599 perror(partial_filepath.c_str());
605 fflush(
txq[node].incoming.progress[tx_id].fp);
613 if(
txq[node].incoming.progress[tx_id].file_size ==
txq[node].incoming.progress[tx_id].total_bytes &&
txq[node].incoming.progress[tx_id].havemeta)
617 if (remote_node >= 0)
622 std::vector<PACKET_BYTE> packet;
627 if (!
txq[node].incoming.progress[tx_id].complete)
629 if (
txq[node].incoming.progress[tx_id].fp !=
nullptr)
631 fclose(
txq[node].incoming.progress[tx_id].fp);
632 txq[
node].incoming.progress[tx_id].fp =
nullptr;
634 std::string final_filepath = tx_in.
temppath +
".file";
635 int iret = rename(final_filepath.c_str(), tx_in.
filepath.c_str());
638 printf(
"Renamed: %d %s\n", iret, tx_in.
filepath.c_str());
641 txq[
node].incoming.progress[tx_id].complete =
true;
682 if (!
txq[node].outgoing.progress[tx_id].file_info.size())
685 txq[
node].outgoing.progress[tx_id].file_info.push_back(tp);
686 txq[
node].outgoing.progress[tx_id].total_bytes += byte_count;
692 for (uint32_t j=0; j<
txq[
node].outgoing.progress[tx_id].file_info.size(); ++j)
695 if (tp.
chunk_start ==
txq[node].outgoing.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end ==
txq[node].outgoing.progress[tx_id].file_info[j].chunk_end)
700 if (tp.
chunk_start <
txq[node].outgoing.progress[tx_id].file_info[j].chunk_start)
703 if (tp.
chunk_end + 1 <
txq[node].outgoing.progress[tx_id].file_info[j].chunk_start)
705 txq[
node].outgoing.progress[tx_id].file_info.insert(
txq[node].outgoing.progress[tx_id].file_info.begin()+j, tp);
706 txq[
node].outgoing.progress[tx_id].total_bytes += byte_count;
713 tp.
chunk_end =
txq[
node].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
716 txq[
node].outgoing.progress[tx_id].total_bytes += byte_count;
724 if (tp.
chunk_start <=
txq[node].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
726 if (tp.
chunk_end >
txq[node].outgoing.progress[tx_id].file_info[j].chunk_end)
728 byte_count = tp.
chunk_end -
txq[
node].outgoing.progress[tx_id].file_info[j].chunk_end;
731 txq[
node].outgoing.progress[tx_id].total_bytes += byte_count;
742 if (check ==
txq[node].outgoing.progress[tx_id].file_info.size())
744 txq[
node].outgoing.progress[tx_id].file_info.push_back(tp);
745 txq[
node].outgoing.progress[tx_id].total_bytes += byte_count;
776 if (remote_node >= 0)
784 std::vector<PACKET_BYTE> packet;
837 if (remote_node >= 0)
843 std::vector<PACKET_BYTE> packet;
875 if (
txq[node].incoming.progress[tx_id].tx_id == queue.
tx_id[
i])
914 std::vector<PACKET_TX_ID_TYPE> tqueue (TRANSFER_QUEUE_LIMIT, 0);
918 if (
txq[node].incoming.progress[tx_id].tx_id && !
txq[node].incoming.progress[tx_id].havemeta)
921 tqueue[iq++] = tx_id;
923 if (iq == TRANSFER_QUEUE_LIMIT)
930 std::vector<PACKET_BYTE> packet;
static socket_channel recvchan
Definition: agent_file.cpp:120
int32_t outgoing_tx_del(int32_t node, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:1968
static std::mutex incoming_tx_lock
Definition: agent_file.cpp:128
Definition: transferlib.h:205
string temppath
Definition: transferlib.h:353
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int32_t set_remote_node_id(PACKET_NODE_ID_TYPE node_id, std::string node_name)
Definition: agent_file.cpp:2241
Definition: transferlib.h:291
int32_t write_meta(tx_progress &tx)
Definition: agent_file.cpp:1316
double queuesendto(std::string type, uint16_t channel, std::vector< PACKET_BYTE > packet)
Definition: agent_file.cpp:1196
PACKET_TX_ID_TYPE check_tx_id(std::vector< tx_progress > tx_entry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:2192
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
Definition: transferlib.h:332
Definition: transferlib.h:338
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:263
static double next_reqmeta_time
Definition: agent_file.cpp:134
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
int32_t next_incoming_tx(PACKET_NODE_ID_TYPE node)
Definition: agent_file.cpp:2255
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
static Agent * agent
Definition: agent_file.cpp:94
#define TRANSFER_QUEUE_SIZE
Definition: agent_file.cpp:58
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
static int32_t active_node
Definition: agent_file.cpp:162
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
void make_complete_packet(vector< PACKET_BYTE > &packet, packet_struct_complete complete)
Definition: transferlib.cpp:54
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
uint16_t running()
Check if we're supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
int32_t check_node_id_1(std::string node_name)
Definition: agent_file.cpp:2204
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
void make_cancel_packet(vector< PACKET_BYTE > &packet, packet_struct_cancel cancel)
Definition: transferlib.cpp:82
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
static std::mutex outgoing_tx_lock
Definition: agent_file.cpp:129
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:207
string filepath
Definition: transferlib.h:352
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
Definition: transferlib.h:261
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string agent_name
Definition: transferlib.h:350
static std::vector< tx_queue > txq
Definition: agent_file.cpp:159
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define SEEK_SET
Definition: zconf.h:475
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file.cpp:2059
static sendchannelstruc send_channel[2]
Definition: agent_file.cpp:108
string file_name
Definition: transferlib.h:351
Definition: transferlib.h:275
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
int32_t lookup_remote_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: agent_file.cpp:2228
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
int32_t myrecvfrom(std::string type, socket_channel channel, std::vector< PACKET_BYTE > &buf, uint32_t length)
Definition: agent_file.cpp:1241
int32_t incoming_tx_add(tx_progress tx_in)
Definition: agent_file.cpp:2008
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
static bool debug_flag
Definition: agent_file.cpp:69
static string node
Definition: agent_monitor.cpp:126
void make_metadata_packet(vector< PACKET_BYTE > &packet, packet_struct_metalong meta)
Definition: transferlib.cpp:163
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
void make_reqmeta_packet(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE node_id, string node_name, vector< PACKET_TX_ID_TYPE > reqmeta)
Definition: transferlib.cpp:110
int32_t incoming_tx_del(int32_t node, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file.cpp:2102
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:293
static double last_data_receive_time
Definition: agent_file.cpp:131