481 vector<PACKET_BYTE> recvbuf;
482 string partial_filepath;
485 std::vector<channelstruc>::size_type
use_channel = 0;
506 uint8_t local_node = 0;
508 switch (recvbuf[0] & 0x0f)
520 txq[local_node].remote_id = remote_node;
530 node_name =
txq[local_node].node_name;
535 if (local_node == 0 || node_name.empty())
571 switch (recvbuf[0] & 0x0f)
607 if (queue.
tx_id[
i] &&
txq[local_node].incoming.progress[tx_id].tx_id == queue.
tx_id[
i])
615 if (
txq[local_node].incoming.progress[tx_id].tx_id && !valid)
651 if (
txq[local_node].remote_id > 0)
658 txq[local_node].outgoing.progress[tx_id].sendmeta =
true;
659 txq[local_node].outgoing.progress[tx_id].sentmeta =
false;
712 if (!
txq[local_node].outgoing.progress[tx_id].file_info.size())
715 txq[local_node].outgoing.progress[tx_id].file_info.push_back(tp);
716 txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
721 for (uint32_t j=0; j<
txq[local_node].outgoing.progress[tx_id].file_info.size(); ++j)
724 if (tp.
chunk_start ==
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end ==
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end)
729 if (tp.
chunk_start <
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start)
732 if (tp.
chunk_end + 1 <
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start)
734 txq[local_node].outgoing.progress[tx_id].file_info.insert(
txq[local_node].outgoing.progress[tx_id].file_info.begin()+j, tp);
735 txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
741 tp.
chunk_end =
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
742 txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
744 txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
751 if (tp.
chunk_start <=
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
753 if (tp.
chunk_end >
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end)
755 byte_count = tp.
chunk_end -
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end;
756 tp.
chunk_start =
txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
757 txq[local_node].outgoing.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
758 txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
768 if (check ==
txq[local_node].outgoing.progress[tx_id].file_info.size())
770 txq[local_node].outgoing.progress[tx_id].file_info.push_back(tp);
771 txq[local_node].outgoing.progress[tx_id].total_bytes += byte_count;
778 txq[local_node].outgoing.progress[tx_id].senddata =
true;
779 txq[local_node].outgoing.progress[tx_id].sentdata =
false;
780 txq[local_node].outgoing.progress[tx_id].sentmeta =
true;
781 txq[local_node].outgoing.progress[tx_id].complete =
false;
817 bool duplicate =
false;
818 bool updated =
false;
821 if (!
txq[local_node].incoming.progress[tx_id].file_info.size())
824 txq[local_node].incoming.progress[tx_id].file_info.push_back(tp);
825 txq[local_node].incoming.progress[tx_id].total_bytes += data.
byte_count;
831 for (uint32_t j=0; j<
txq[local_node].incoming.progress[tx_id].file_info.size(); ++j)
834 if (tp.
chunk_start >=
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end <=
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end)
840 if (tp.
chunk_start <
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start)
843 if (tp.
chunk_end + 1 <
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start)
845 txq[local_node].incoming.progress[tx_id].file_info.insert(
txq[local_node].incoming.progress[tx_id].file_info.begin()+j, tp);
846 txq[local_node].incoming.progress[tx_id].total_bytes += data.
byte_count;
853 tp.
chunk_end =
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start - 1;
854 txq[local_node].incoming.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
856 txq[local_node].incoming.progress[tx_id].total_bytes += data.
byte_count;
864 if (tp.
chunk_start <=
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end + 1)
866 if (tp.
chunk_end >
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end)
869 tp.
chunk_start =
txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end + 1;
870 txq[local_node].incoming.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
871 txq[local_node].incoming.progress[tx_id].total_bytes += data.
byte_count;
882 if (!duplicate && check ==
txq[local_node].incoming.progress[tx_id].file_info.size())
884 txq[local_node].incoming.progress[tx_id].file_info.push_back(tp);
885 txq[local_node].incoming.progress[tx_id].total_bytes += data.
byte_count;
895 if (
txq[local_node].incoming.progress[tx_id].fp ==
nullptr)
897 partial_filepath =
txq[local_node].incoming.progress[tx_id].temppath +
".file";
900 txq[local_node].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(),
"r+");
904 txq[local_node].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(),
"w");
908 if (
txq[local_node].incoming.progress[tx_id].fp ==
nullptr)
922 fflush(
txq[local_node].incoming.progress[tx_id].fp);
938 if(
txq[local_node].incoming.progress[tx_id].file_size ==
txq[local_node].incoming.progress[tx_id].total_bytes &&
txq[local_node].incoming.progress[tx_id].havemeta)
941 if (
txq[local_node].remote_id > 0)
953 if (!
txq[local_node].incoming.progress[tx_id].complete)
955 if (
txq[local_node].incoming.progress[tx_id].fp !=
nullptr)
957 fclose(
txq[local_node].incoming.progress[tx_id].fp);
958 txq[local_node].incoming.progress[tx_id].fp =
nullptr;
960 string final_filepath = tx_in.
temppath +
".file";
961 int iret = rename(final_filepath.c_str(), tx_in.
filepath.c_str());
973 txq[local_node].incoming.progress[tx_id].complete =
true;
974 txq[local_node].incoming.progress[tx_id].senddata =
false;
975 txq[local_node].incoming.progress[tx_id].sentdata =
true;
983 txq[local_node].incoming.progress[data.
tx_id].sendmeta =
true;
1006 txq[local_node].outgoing.progress[tx_id].complete =
true;
1047 if (
txq[(
node)].incoming.progress[tx_id].tx_id &&
txq[(
node)].incoming.progress[tx_id].havemeta)
string node
Definition: agent_file2.cpp:100
PACKET_FILE_SIZE_TYPE total_bytes
Definition: transferlib.h:357
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file3.cpp:2872
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
Definition: transferlib.h:205
string temppath
Definition: transferlib.h:353
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
static Agent * agent
Definition: agent_file3.cpp:94
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:340
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file3.cpp:2812
int32_t myrecvfrom(string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
Definition: agent_file3.cpp:1521
Definition: transferlib.h:291
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file3.cpp:1823
ElapsedTime dt
Definition: agent_file3.cpp:183
double limjd
Definition: agent_file3.cpp:103
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
uint8_t lookup_local_node_id(PACKET_NODE_ID_TYPE remote_id)
Definition: agent_file3.cpp:3162
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file3.cpp:2693
Definition: transferlib.h:332
static vector< tx_queue > txq
Definition: agent_file3.cpp:179
Definition: transferlib.h:338
string chanip
Definition: agent_file2.cpp:102
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
Definition: transferlib.h:155
static std::mutex incoming_tx_lock
Definition: agent_file3.cpp:140
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
PACKET_FILE_SIZE_TYPE file_size
Definition: transferlib.h:356
char address[17]
Definition: socketlib.h:134
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
Definition: agent_file2.cpp:98
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
static std::mutex debug_fd_lock
Definition: agent_file3.cpp:142
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we're supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
int32_t add_node_name(string node_name)
Definition: agent_file3.cpp:3110
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
string node_name
Definition: transferlib.h:349
double lomjd
Definition: agent_file3.cpp:104
string filepath
Definition: transferlib.h:352
Definition: socketlib.h:115
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
struct sockaddr_in caddr
Definition: socketlib.h:122
Definition: transferlib.h:195
Definition: transferlib.h:261
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
static uint32_t type_error_count
Definition: agent_file3.cpp:152
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
void extract_heartbeat(vector< PACKET_BYTE > &packet, packet_struct_heartbeat &heartbeat)
Definition: transferlib.cpp:316
static std::mutex outgoing_tx_lock
Definition: agent_file3.cpp:141
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
#define PACKET_HEADER_OFFSET_NODE_NAME
Definition: transferlib.h:153
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
static vector< channelstruc > out_comm_channel
Definition: agent_file3.cpp:119
#define SEEK_SET
Definition: zconf.h:475
#define PROGRESS_QUEUE_SIZE
Definition: agent_file3.cpp:62
Definition: transferlib.h:275
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
socket_channel chansock
Definition: agent_file2.cpp:101
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
static double last_data_receive_time
Definition: agent_file3.cpp:144
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
double nmjd
Definition: agent_file2.cpp:105
static const unsigned char PACKET_HEARTBEAT
Definition: transferlib.h:111
static string node
Definition: agent_monitor.cpp:126
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file3.cpp:3086
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
double fmjd
Definition: agent_file3.cpp:106
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
static const unsigned char PACKET_REQQUEUE
Definition: transferlib.h:110
#define PACKET_HEADER_OFFSET_NODE_ID
Definition: transferlib.h:152
bool data_exists(string &path)
Check existence of path.
Definition: datalib.cpp:1003
void extract_reqqueue(vector< PACKET_BYTE > &packet, packet_struct_reqqueue &reqqueue)
Definition: transferlib.cpp:271