13 #ifndef hifi_SendQueue_h
14 #define hifi_SendQueue_h
17 #include <condition_variable>
22 #include <unordered_map>
24 #include <QtCore/QObject>
25 #include <QtCore/QReadWriteLock>
27 #include <PortableHighResolutionClock.h>
29 #include "../SockAddr.h"
31 #include "Constants.h"
32 #include "PacketQueue.h"
33 #include "SequenceNumber.h"
44 class SendQueue :
public QObject {
54 static std::unique_ptr<SendQueue> create(Socket* socket, SockAddr destination,
55 SequenceNumber currentSequenceNumber, MessageNumber currentMessageNumber,
56 bool hasReceivedHandshakeACK);
60 void queuePacket(std::unique_ptr<Packet> packet);
61 void queuePacketList(std::unique_ptr<PacketList> packetList);
63 SequenceNumber getCurrentSequenceNumber()
const {
return SequenceNumber(_atomicCurrentSequenceNumber); }
64 MessageNumber getCurrentMessageNumber()
const {
return _packets.getCurrentMessageNumber(); }
66 void setFlowWindowSize(
int flowWindowSize) { _flowWindowSize = flowWindowSize; }
68 int getPacketSendPeriod()
const {
return _packetSendPeriod; }
69 void setPacketSendPeriod(
int newPeriod) { _packetSendPeriod = newPeriod; }
71 void setEstimatedTimeout(
int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; }
76 void ack(SequenceNumber ack);
77 void fastRetransmit(SequenceNumber ack);
79 void updateDestinationAddress(SockAddr newAddress);
82 void packetSent(
int wireSize,
int payloadSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint);
83 void packetRetransmitted(
int wireSize,
int payloadSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint);
93 Q_DISABLE_COPY_MOVE(SendQueue)
94 SendQueue(Socket* socket, SockAddr dest, SequenceNumber currentSequenceNumber,
95 MessageNumber currentMessageNumber,
bool hasReceivedHandshakeACK);
99 int sendPacket(
const Packet& packet);
100 bool sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
102 int maybeSendNewPacket();
103 bool maybeResendPacket();
105 bool isInactive(
bool attemptedToSendPacket);
108 bool isFlowWindowFull()
const;
111 SequenceNumber getNextSequenceNumber();
113 PacketQueue _packets;
115 Socket* _socket {
nullptr };
116 SockAddr _destination;
118 std::atomic<uint32_t> _lastACKSequenceNumber { 0 };
120 SequenceNumber _currentSequenceNumber { 0 };
121 std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };
123 std::atomic<int> _packetSendPeriod { 0 };
124 std::atomic<State> _state { State::NotStarted };
126 std::atomic<int> _estimatedTimeout { 0 };
128 std::atomic<int> _flowWindowSize { 0 };
130 mutable std::mutex _naksLock;
133 mutable QReadWriteLock _sentLock;
134 using PacketResendPair = std::pair<uint8_t, std::unique_ptr<Packet>>;
135 std::unordered_map<SequenceNumber, PacketResendPair> _sentPackets;
137 std::mutex _handshakeMutex;
138 std::atomic<bool> _hasReceivedHandshakeACK {
false };
139 std::condition_variable _handshakeACKCondition;
141 std::condition_variable_any _emptyCondition;
143 std::chrono::high_resolution_clock::time_point _lastPacketSentAt;
145 static const std::chrono::microseconds MAXIMUM_ESTIMATED_TIMEOUT;
146 static const std::chrono::microseconds MINIMUM_ESTIMATED_TIMEOUT;