mirror of
https://github.com/luanti-org/luanti.git
synced 2025-07-22 17:18:39 +00:00
Network cleanup (#6310)
* Move Connection threads to dedicated files + various cleanups * ConnectionReceiveThread::processPacket now uses function pointer table to route MT packet types * Various code style fixes * Code style with clang-format * Various SharedBuffer copy removal * SharedBuffer cannot be copied anymore using Buffer * Fix many SharedBuffer copy (thanks to delete operator)
This commit is contained in:
parent
f6a33a1a7a
commit
3cea7a349a
11 changed files with 1670 additions and 1669 deletions
|
@ -37,13 +37,19 @@ class NetworkPacket;
|
|||
namespace con
|
||||
{
|
||||
|
||||
class ConnectionReceiveThread;
|
||||
class ConnectionSendThread;
|
||||
|
||||
typedef enum MTProtocols {
|
||||
MTP_PRIMARY,
|
||||
MTP_UDP,
|
||||
MTP_MINETEST_RELIABLE_UDP
|
||||
} MTProtocols;
|
||||
|
||||
#define MAX_UDP_PEERS 65535
|
||||
|
||||
#define SEQNUM_MAX 65535
|
||||
|
||||
inline bool seqnum_higher(u16 totest, u16 base)
|
||||
{
|
||||
if (totest > base)
|
||||
|
@ -73,6 +79,12 @@ inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
|
|||
return ((seqnum < window_end) || (seqnum >= window_start));
|
||||
}
|
||||
|
||||
static inline float CALC_DTIME(u64 lasttime, u64 curtime)
|
||||
{
|
||||
float value = ( curtime - lasttime) / 1000.0;
|
||||
return MYMAX(MYMIN(value,0.1),0.0);
|
||||
}
|
||||
|
||||
struct BufferedPacket
|
||||
{
|
||||
BufferedPacket(u8 *a_data, u32 a_size):
|
||||
|
@ -90,44 +102,31 @@ struct BufferedPacket
|
|||
};
|
||||
|
||||
// This adds the base headers to the data and makes a packet out of it
|
||||
BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
|
||||
BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
|
||||
u32 protocol_id, u16 sender_peer_id, u8 channel);
|
||||
BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
|
||||
u32 protocol_id, u16 sender_peer_id, u8 channel);
|
||||
|
||||
// Add the TYPE_ORIGINAL header to the data
|
||||
SharedBuffer<u8> makeOriginalPacket(
|
||||
SharedBuffer<u8> data);
|
||||
|
||||
// Split data in chunks and add TYPE_SPLIT headers to them
|
||||
std::list<SharedBuffer<u8> > makeSplitPacket(
|
||||
SharedBuffer<u8> data,
|
||||
u32 chunksize_max,
|
||||
u16 seqnum);
|
||||
|
||||
// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
|
||||
// Increments split_seqnum if a split packet is made
|
||||
std::list<SharedBuffer<u8> > makeAutoSplitPacket(
|
||||
SharedBuffer<u8> data,
|
||||
u32 chunksize_max,
|
||||
u16 &split_seqnum);
|
||||
void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
|
||||
u16 &split_seqnum, std::list<SharedBuffer<u8>> *list);
|
||||
|
||||
// Add the TYPE_RELIABLE header to the data
|
||||
SharedBuffer<u8> makeReliablePacket(
|
||||
const SharedBuffer<u8> &data,
|
||||
u16 seqnum);
|
||||
SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum);
|
||||
|
||||
struct IncomingSplitPacket
|
||||
{
|
||||
IncomingSplitPacket() = default;
|
||||
IncomingSplitPacket(u32 cc, bool r):
|
||||
chunk_count(cc), reliable(r) {}
|
||||
|
||||
IncomingSplitPacket() = delete;
|
||||
|
||||
// Key is chunk number, value is data without headers
|
||||
std::map<u16, SharedBuffer<u8> > chunks;
|
||||
std::map<u16, SharedBuffer<u8>> chunks;
|
||||
u32 chunk_count;
|
||||
float time = 0.0f; // Seconds from adding
|
||||
bool reliable = false; // If true, isn't deleted on timeout
|
||||
|
||||
bool allReceived()
|
||||
bool allReceived() const
|
||||
{
|
||||
return (chunks.size() == chunk_count);
|
||||
}
|
||||
|
@ -171,7 +170,7 @@ controltype and data description:
|
|||
packet to get a reply
|
||||
CONTROLTYPE_DISCO
|
||||
*/
|
||||
#define TYPE_CONTROL 0
|
||||
//#define TYPE_CONTROL 0
|
||||
#define CONTROLTYPE_ACK 0
|
||||
#define CONTROLTYPE_SET_PEER_ID 1
|
||||
#define CONTROLTYPE_PING 2
|
||||
|
@ -185,7 +184,7 @@ checking at all.
|
|||
Header (1 byte):
|
||||
[0] u8 type
|
||||
*/
|
||||
#define TYPE_ORIGINAL 1
|
||||
//#define TYPE_ORIGINAL 1
|
||||
#define ORIGINAL_HEADER_SIZE 1
|
||||
/*
|
||||
SPLIT: These are sequences of packets forming one bigger piece of
|
||||
|
@ -202,7 +201,7 @@ data.
|
|||
[3] u16 chunk_count
|
||||
[5] u16 chunk_num
|
||||
*/
|
||||
#define TYPE_SPLIT 2
|
||||
//#define TYPE_SPLIT 2
|
||||
/*
|
||||
RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
|
||||
and they shall be delivered in the same order as sent. This is done
|
||||
|
@ -214,10 +213,17 @@ with a buffer in the receiving and transmitting end.
|
|||
[1] u16 seqnum
|
||||
|
||||
*/
|
||||
#define TYPE_RELIABLE 3
|
||||
//#define TYPE_RELIABLE 3
|
||||
#define RELIABLE_HEADER_SIZE 3
|
||||
#define SEQNUM_INITIAL 65500
|
||||
|
||||
enum PacketType: u8 {
|
||||
PACKET_TYPE_CONTROL = 0,
|
||||
PACKET_TYPE_ORIGINAL = 1,
|
||||
PACKET_TYPE_SPLIT = 2,
|
||||
PACKET_TYPE_RELIABLE = 3,
|
||||
PACKET_TYPE_MAX
|
||||
};
|
||||
/*
|
||||
A buffer which stores reliable packets and sorts them internally
|
||||
for fast access to the smallest one.
|
||||
|
@ -270,7 +276,7 @@ public:
|
|||
Returns a reference counted buffer of length != 0 when a full split
|
||||
packet is constructed. If not, returns one of length 0.
|
||||
*/
|
||||
SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
|
||||
SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
|
||||
|
||||
void removeUnreliableTimedOuts(float dtime, float timeout);
|
||||
|
||||
|
@ -318,8 +324,8 @@ struct ConnectionCommand
|
|||
enum ConnectionCommandType type = CONNCMD_NONE;
|
||||
Address address;
|
||||
u16 peer_id = PEER_ID_INEXISTENT;
|
||||
u8 channelnum;
|
||||
Buffer<u8> data;
|
||||
u8 channelnum = 0;
|
||||
SharedBuffer<u8> data;
|
||||
bool reliable = false;
|
||||
bool raw = false;
|
||||
|
||||
|
@ -551,13 +557,12 @@ class Peer {
|
|||
|
||||
virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
|
||||
virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
|
||||
virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
|
||||
BufferedPacket toadd,
|
||||
bool reliable)
|
||||
{
|
||||
fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
|
||||
return SharedBuffer<u8>(0);
|
||||
};
|
||||
virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
|
||||
bool reliable)
|
||||
{
|
||||
fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
|
||||
return SharedBuffer<u8>(0);
|
||||
};
|
||||
|
||||
virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
|
||||
|
||||
|
@ -649,10 +654,8 @@ public:
|
|||
u16 getNextSplitSequenceNumber(u8 channel);
|
||||
void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
|
||||
|
||||
SharedBuffer<u8> addSpiltPacket(u8 channel,
|
||||
BufferedPacket toadd,
|
||||
bool reliable);
|
||||
|
||||
SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
|
||||
bool reliable);
|
||||
|
||||
protected:
|
||||
/*
|
||||
|
@ -750,103 +753,6 @@ struct ConnectionEvent
|
|||
}
|
||||
};
|
||||
|
||||
class ConnectionSendThread : public Thread {
|
||||
|
||||
public:
|
||||
friend class UDPPeer;
|
||||
|
||||
ConnectionSendThread(unsigned int max_packet_size, float timeout);
|
||||
|
||||
void *run();
|
||||
|
||||
void Trigger();
|
||||
|
||||
void setParent(Connection* parent) {
|
||||
assert(parent != NULL); // Pre-condition
|
||||
m_connection = parent;
|
||||
}
|
||||
|
||||
void setPeerTimeout(float peer_timeout)
|
||||
{ m_timeout = peer_timeout; }
|
||||
|
||||
private:
|
||||
void runTimeouts (float dtime);
|
||||
void rawSend (const BufferedPacket &packet);
|
||||
bool rawSendAsPacket(u16 peer_id, u8 channelnum,
|
||||
SharedBuffer<u8> data, bool reliable);
|
||||
|
||||
void processReliableCommand (ConnectionCommand &c);
|
||||
void processNonReliableCommand (ConnectionCommand &c);
|
||||
void serve (Address bind_address);
|
||||
void connect (Address address);
|
||||
void disconnect ();
|
||||
void disconnect_peer(u16 peer_id);
|
||||
void send (u16 peer_id, u8 channelnum,
|
||||
SharedBuffer<u8> data);
|
||||
void sendReliable (ConnectionCommand &c);
|
||||
void sendToAll (u8 channelnum,
|
||||
SharedBuffer<u8> data);
|
||||
void sendToAllReliable(ConnectionCommand &c);
|
||||
|
||||
void sendPackets (float dtime);
|
||||
|
||||
void sendAsPacket (u16 peer_id, u8 channelnum,
|
||||
SharedBuffer<u8> data,bool ack=false);
|
||||
|
||||
void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
|
||||
|
||||
bool packetsQueued();
|
||||
|
||||
Connection *m_connection = nullptr;
|
||||
unsigned int m_max_packet_size;
|
||||
float m_timeout;
|
||||
std::queue<OutgoingPacket> m_outgoing_queue;
|
||||
Semaphore m_send_sleep_semaphore;
|
||||
|
||||
unsigned int m_iteration_packets_avaialble;
|
||||
unsigned int m_max_commands_per_iteration = 1;
|
||||
unsigned int m_max_data_packets_per_iteration;
|
||||
unsigned int m_max_packets_requeued = 256;
|
||||
};
|
||||
|
||||
class ConnectionReceiveThread : public Thread {
|
||||
public:
|
||||
ConnectionReceiveThread(unsigned int max_packet_size);
|
||||
|
||||
void *run();
|
||||
|
||||
void setParent(Connection *parent) {
|
||||
assert(parent); // Pre-condition
|
||||
m_connection = parent;
|
||||
}
|
||||
|
||||
private:
|
||||
void receive();
|
||||
|
||||
// Returns next data from a buffer if possible
|
||||
// If found, returns true; if not, false.
|
||||
// If found, sets peer_id and dst
|
||||
bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
|
||||
|
||||
bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
|
||||
SharedBuffer<u8> &dst);
|
||||
|
||||
/*
|
||||
Processes a packet with the basic header stripped out.
|
||||
Parameters:
|
||||
packetdata: Data in packet (with no base headers)
|
||||
peer_id: peer id of the sender of the packet in question
|
||||
channelnum: channel on which the packet was sent
|
||||
reliable: true if recursing into a reliable packet
|
||||
*/
|
||||
SharedBuffer<u8> processPacket(Channel *channel,
|
||||
SharedBuffer<u8> packetdata, u16 peer_id,
|
||||
u8 channelnum, bool reliable);
|
||||
|
||||
|
||||
Connection *m_connection = nullptr;
|
||||
};
|
||||
|
||||
class PeerHandler;
|
||||
|
||||
class Connection
|
||||
|
@ -863,7 +769,7 @@ public:
|
|||
ConnectionEvent waitEvent(u32 timeout_ms);
|
||||
void putCommand(ConnectionCommand &c);
|
||||
|
||||
void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
|
||||
void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
|
||||
void Serve(Address bind_addr);
|
||||
void Connect(Address address);
|
||||
bool Connected();
|
||||
|
@ -879,7 +785,6 @@ public:
|
|||
void DisconnectPeer(u16 peer_id);
|
||||
|
||||
protected:
|
||||
PeerHelper getPeer(u16 peer_id);
|
||||
PeerHelper getPeerNoEx(u16 peer_id);
|
||||
u16 lookupPeer(Address& sender);
|
||||
|
||||
|
@ -892,7 +797,6 @@ protected:
|
|||
void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
|
||||
|
||||
void PrintInfo(std::ostream &out);
|
||||
void PrintInfo();
|
||||
|
||||
std::list<u16> getPeerIDs()
|
||||
{
|
||||
|
@ -905,8 +809,7 @@ protected:
|
|||
|
||||
void putEvent(ConnectionEvent &e);
|
||||
|
||||
void TriggerSend()
|
||||
{ m_sendThread.Trigger(); }
|
||||
void TriggerSend();
|
||||
private:
|
||||
std::list<Peer*> getPeers();
|
||||
|
||||
|
@ -919,14 +822,14 @@ private:
|
|||
std::list<u16> m_peer_ids;
|
||||
std::mutex m_peers_mutex;
|
||||
|
||||
ConnectionSendThread m_sendThread;
|
||||
ConnectionReceiveThread m_receiveThread;
|
||||
std::unique_ptr<ConnectionSendThread> m_sendThread;
|
||||
std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
|
||||
|
||||
std::mutex m_info_mutex;
|
||||
|
||||
// Backwards compatibility
|
||||
PeerHandler *m_bc_peerhandler;
|
||||
int m_bc_receive_timeout = 0;
|
||||
u32 m_bc_receive_timeout = 0;
|
||||
|
||||
bool m_shutting_down = false;
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue