504 vector<PACKET_BYTE> recvbuf;
505 string partial_filepath;
508 std::vector<channelstruc>::size_type
use_channel = 0;
528 string packet_node_name;
531 switch (recvbuf[0] & 0x0f)
547 if (node_id <= 0 || node_name.empty())
552 txq[node_id].incoming.rcvdqueue =
false;
553 txq[node_id].incoming.rcvdmeta =
false;
554 txq[node_id].incoming.rcvddata =
false;
588 switch (recvbuf[0] & 0x0f)
611 imessage.resize(message.
length);
612 memcpy(&imessage[0], message.
bytes, message.
length);
623 txq[node_id].outgoing.sendqueue =
true;
624 txq[node_id].outgoing.sentqueue =
false;
633 txq[node_id].outgoing.sendqueue =
true;
634 txq[node_id].outgoing.sentqueue =
false;
645 txq[node_id].incoming.sentqueue =
true;
646 txq[node_id].incoming.sendqueue =
false;
647 txq[(node_id)].incoming.rcvdqueue =
true;
655 if (queue.
tx_id[
i] &&
txq[node_id].incoming.progress[tx_id].tx_id == queue.
tx_id[
i])
663 if (
txq[node_id].incoming.progress[tx_id].tx_id && !valid)
695 txq[node_id].outgoing.sentqueue =
false;
698 if (
txq[node_id].node_id > 0)
705 txq[node_id].outgoing.progress[tx_id].sendmeta =
true;
706 txq[node_id].outgoing.progress[tx_id].sentmeta =
false;
752 txq[node_id].outgoing.progress[tx_id].sendmeta =
false;
753 txq[node_id].outgoing.progress[tx_id].sentmeta =
true;
763 if (!
txq[node_id].outgoing.progress[tx_id].file_info.size())
766 txq[node_id].outgoing.progress[tx_id].file_info.push_back(tp);
767 txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
772 for (uint32_t j=0; j<
txq[node_id].outgoing.progress[tx_id].file_info.size(); ++j)
775 if (tp.
chunk_start ==
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end ==
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end)
780 if (tp.
chunk_start <
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start)
783 if (tp.
chunk_end + 1 <
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start)
785 txq[node_id].outgoing.progress[tx_id].file_info.insert(
txq[node_id].outgoing.progress[tx_id].file_info.begin()+j, tp);
786 txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
792 tp.
chunk_end =
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start - 1;
793 txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
795 txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
802 if (tp.
chunk_start <=
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end + 1)
804 if (tp.
chunk_end >
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end)
806 byte_count = tp.
chunk_end -
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end;
807 tp.
chunk_start =
txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end + 1;
808 txq[node_id].outgoing.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
809 txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
819 if (check ==
txq[node_id].outgoing.progress[tx_id].file_info.size())
821 txq[node_id].outgoing.progress[tx_id].file_info.push_back(tp);
822 txq[node_id].outgoing.progress[tx_id].total_bytes += byte_count;
829 txq[node_id].outgoing.sendqueue =
false;
830 txq[node_id].outgoing.sentqueue =
true;
831 txq[node_id].outgoing.progress[tx_id].senddata =
true;
832 txq[node_id].outgoing.progress[tx_id].sentdata =
false;
833 txq[node_id].outgoing.progress[tx_id].sendmeta =
false;
834 txq[node_id].outgoing.progress[tx_id].sentmeta =
true;
857 txq[node_id].incoming.rcvddata =
true;
858 txq[node_id].incoming.progress[tx_id].datatime =
currentmjd();
869 bool duplicate =
false;
870 bool updated =
false;
873 if (!
txq[node_id].incoming.progress[tx_id].file_info.size())
876 txq[node_id].incoming.progress[tx_id].file_info.push_back(tp);
877 txq[node_id].incoming.progress[tx_id].total_bytes += data.
byte_count;
883 for (uint32_t j=0; j<
txq[node_id].incoming.progress[tx_id].file_info.size(); ++j)
886 if (tp.
chunk_start >=
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start && tp.
chunk_end <=
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end)
892 if (tp.
chunk_start <
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start)
895 if (tp.
chunk_end + 1 <
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start)
897 txq[node_id].incoming.progress[tx_id].file_info.insert(
txq[node_id].incoming.progress[tx_id].file_info.begin()+j, tp);
898 txq[node_id].incoming.progress[tx_id].total_bytes += data.
byte_count;
905 tp.
chunk_end =
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start - 1;
906 txq[node_id].incoming.progress[tx_id].file_info[j].chunk_start = tp.
chunk_start;
908 txq[node_id].incoming.progress[tx_id].total_bytes += data.
byte_count;
916 if (tp.
chunk_start <=
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end + 1)
918 if (tp.
chunk_end >
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end)
921 tp.
chunk_start =
txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end + 1;
922 txq[node_id].incoming.progress[tx_id].file_info[j].chunk_end = tp.
chunk_end;
923 txq[node_id].incoming.progress[tx_id].total_bytes += data.
byte_count;
934 if (!duplicate && check ==
txq[node_id].incoming.progress[tx_id].file_info.size())
936 txq[node_id].incoming.progress[tx_id].file_info.push_back(tp);
937 txq[node_id].incoming.progress[tx_id].total_bytes += data.
byte_count;
947 if (
txq[node_id].incoming.progress[tx_id].fp ==
nullptr)
949 partial_filepath =
txq[node_id].incoming.progress[tx_id].temppath +
".file";
952 txq[node_id].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(),
"r+");
956 txq[node_id].incoming.progress[tx_id].fp = fopen(partial_filepath.c_str(),
"w");
960 if (
txq[node_id].incoming.progress[tx_id].fp ==
nullptr)
974 fflush(
txq[node_id].incoming.progress[tx_id].fp);
1034 txq[node_id].incoming.progress[data.
tx_id].sendmeta =
true;
1053 txq[node_id].outgoing.progress[tx_id].complete =
true;
1054 txq[node_id].outgoing.progress[tx_id].senddata =
false;
1055 txq[node_id].outgoing.progress[tx_id].sendmeta =
false;
1056 txq[node_id].outgoing.sendqueue =
true;
1057 txq[node_id].outgoing.sentqueue =
false;
1098 if (
txq[(
node)].incoming.progress[tx_id].tx_id &&
txq[(
node)].incoming.progress[tx_id].havemeta)
string node
Definition: agent_file2.cpp:100
PACKET_BYTE length
Definition: transferlib.h:174
int32_t incoming_tx_add(tx_progress &tx_in)
Definition: agent_file4.cpp:2811
uint16_t debug_level
Flag for level of debugging, keep it public so that it can be controlled from the outside...
Definition: agentclass.h:362
Definition: transferlib.h:205
PACKET_TX_ID_TYPE check_tx_id(tx_entry &txentry, PACKET_TX_ID_TYPE tx_id)
Definition: agent_file4.cpp:3246
int32_t write_meta(tx_progress &tx, double interval=5.)
Definition: agent_file4.cpp:1843
FILE * get_debug_fd(double mjd=0.)
Definition: agentclass.cpp:2645
int32_t incoming_tx_update(packet_struct_metashort meta)
Definition: agent_file4.cpp:2930
PACKET_TX_ID_TYPE tx_id[((225-(COSMOS_SIZEOF(PACKET_TYPE)+COSMOS_SIZEOF(PACKET_NODE_ID_TYPE)+COSMOS_SIZEOF(PACKET_TX_ID_TYPE)+COSMOS_MAX_NAME))/(COSMOS_SIZEOF(PACKET_TX_ID_TYPE)))]
Definition: transferlib.h:209
void extract_reqdata(vector< PACKET_BYTE > &packet, packet_struct_reqdata &reqdata)
Definition: transferlib.cpp:150
static vector< tx_queue > txq
Definition: agent_file4.cpp:182
PACKET_FILE_SIZE_TYPE chunk_end
Definition: transferlib.h:335
Definition: transferlib.h:291
double limjd
Definition: agent_file3.cpp:103
int32_t incoming_tx_del(uint8_t node, uint16_t tx_id=256)
Definition: agent_file4.cpp:3000
string nodeName
Definition: agentclass.h:367
int i
Definition: rw_test.cpp:37
void extract_complete(vector< PACKET_BYTE > &packet, packet_struct_complete &complete)
Definition: transferlib.cpp:71
char node_name[COSMOS_MAX_NAME+1]
Definition: transferlib.h:208
Definition: transferlib.h:332
FILE * data_open(string path, const char *mode)
Open file from path.
Definition: datalib.cpp:1019
string chanip
Definition: agent_file2.cpp:102
PACKET_CHUNK_SIZE_TYPE byte_count
Definition: transferlib.h:279
void extract_metadata(vector< PACKET_BYTE > &packet, packet_struct_metalong &meta)
Definition: transferlib.cpp:204
Definition: transferlib.h:155
int32_t myrecvfrom(string type, socket_channel &channel, vector< PACKET_BYTE > &buf, uint32_t length, double dtimeout=1.)
Definition: agent_file4.cpp:1555
static const unsigned char PACKET_QUEUE
Definition: transferlib.h:109
static Agent * agent
Definition: agent_file4.cpp:94
char address[17]
Definition: socketlib.h:134
static std::mutex incoming_tx_lock
Definition: agent_file4.cpp:141
static const unsigned char PACKET_METADATA
Definition: transferlib.h:103
ElapsedTime dt
Definition: agent_file4.cpp:186
PACKET_BYTE bytes[225-2]
Definition: transferlib.h:187
Definition: agent_file2.cpp:98
string node_name
Definition: agent_001.cpp:46
static uint16_t use_channel
Definition: agent_file.cpp:97
PACKET_BYTE length
Definition: transferlib.h:186
string cosmos_error_string(int32_t cosmos_errno)
Definition: cosmos-errno.cpp:45
uint16_t running()
Check if we're supposed to be running.
Definition: agentclass.cpp:391
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:264
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:334
string lookup_node_id_name(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:536
static ElapsedTime tet
Definition: agent_file4.cpp:137
static std::mutex debug_fd_lock
Definition: agent_file4.cpp:143
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:294
void extract_message(vector< PACKET_BYTE > &packet, packet_struct_message &message)
Definition: transferlib.cpp:341
PACKET_BYTE bytes[225-2]
Definition: transferlib.h:175
Definition: transferlib.h:183
void extract_data(vector< PACKET_BYTE > &packet, PACKET_NODE_ID_TYPE &node_id, PACKET_TX_ID_TYPE &tx_id, PACKET_CHUNK_SIZE_TYPE &byte_count, PACKET_FILE_SIZE_TYPE &chunk_start, PACKET_BYTE *chunk)
Definition: transferlib.cpp:248
int32_t PACKET_FILE_SIZE_TYPE
Definition: transferlib.h:146
int32_t check_node_id(PACKET_NODE_ID_TYPE node_id)
Definition: transferlib.cpp:494
double lap()
Lap Time.
Definition: elapsedtime.cpp:145
uint8_t message[300]
Definition: kpc9612p_send.cpp:36
double lomjd
Definition: agent_file3.cpp:104
Definition: socketlib.h:115
PACKET_TX_ID_TYPE tx_id
Definition: transferlib.h:278
struct sockaddr_in caddr
Definition: socketlib.h:122
Definition: transferlib.h:195
Definition: transferlib.h:261
void extract_command(vector< PACKET_BYTE > &packet, packet_struct_command &command)
Definition: transferlib.cpp:364
#define TRANSFER_QUEUE_LIMIT
Definition: transferlib.h:80
string command
Definition: add_radio.cpp:27
#define PROGRESS_QUEUE_SIZE
Definition: agent_file4.cpp:62
static const unsigned char PACKET_REQDATA
Definition: transferlib.h:105
void extract_heartbeat(vector< PACKET_BYTE > &packet, packet_struct_heartbeat &heartbeat)
Definition: transferlib.cpp:316
static const unsigned char PACKET_MESSAGE
Definition: transferlib.h:112
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:277
uint8_t PACKET_TX_ID_TYPE
Definition: transferlib.h:144
#define PACKET_HEADER_OFFSET_NODE_NAME
Definition: transferlib.h:153
void log_write(string node, string agent, double utc, string extra, string type, string record, string location)
Write log entry - full.
Definition: datalib.cpp:75
double currentmjd(double offset)
Current UTC in Modified Julian Days.
Definition: timelib.cpp:65
#define SEEK_SET
Definition: zconf.h:475
static uint32_t type_error_count
Definition: agent_file4.cpp:153
Definition: transferlib.h:171
static const unsigned char PACKET_COMMAND
Definition: transferlib.h:113
static std::mutex outgoing_tx_lock
Definition: agent_file4.cpp:142
Definition: transferlib.h:275
static vector< channelstruc > out_comm_channel
Definition: agent_file4.cpp:117
#define PACKET_MAX_LENGTH
Definition: transferlib.h:75
socket_channel chansock
Definition: agent_file2.cpp:101
PACKET_FILE_SIZE_TYPE hole_start
Definition: transferlib.h:265
PACKET_FILE_SIZE_TYPE hole_end
Definition: transferlib.h:266
static const unsigned char PACKET_REQMETA
Definition: transferlib.h:106
PACKET_FILE_SIZE_TYPE chunk_start
Definition: transferlib.h:280
FILE * fp
Definition: rw_test.cpp:38
PACKET_BYTE chunk[1500]
Definition: transferlib.h:281
static double last_data_receive_time
Definition: agent_file4.cpp:145
double nmjd
Definition: agent_file2.cpp:105
static const unsigned char PACKET_HEARTBEAT
Definition: transferlib.h:111
static string node
Definition: agent_monitor.cpp:126
double split()
ElapsedTime::split, gets the current elapsed time since the start()
Definition: elapsedtime.cpp:234
PACKET_NODE_ID_TYPE node_id
Definition: transferlib.h:173
static const unsigned char PACKET_DATA
Definition: transferlib.h:104
void extract_queue(vector< PACKET_BYTE > &packet, packet_struct_queue &queue)
Definition: transferlib.cpp:292
static const unsigned char PACKET_CANCEL
Definition: transferlib.h:108
double fmjd
Definition: agent_file3.cpp:106
void extract_reqmeta(vector< PACKET_BYTE > &packet, packet_struct_reqmeta &reqmeta)
Definition: transferlib.cpp:124
void profile_check(int32_t line_number, uint16_t thread=0)
Definition: agent_file4.cpp:3629
static const unsigned char PACKET_COMPLETE
Definition: transferlib.h:107
static const unsigned char PACKET_REQQUEUE
Definition: transferlib.h:110
#define PACKET_HEADER_OFFSET_NODE_ID
Definition: transferlib.h:152
bool data_exists(string &path)
Check existence of path.
Definition: datalib.cpp:1003
void extract_reqqueue(vector< PACKET_BYTE > &packet, packet_struct_reqqueue &reqqueue)
Definition: transferlib.cpp:271