Overte C++ Documentation
SendQueue.h
1 //
2 // SendQueue.h
3 // libraries/networking/src/udt
4 //
5 // Created by Clement on 7/21/15.
6 // Copyright 2015 High Fidelity, Inc.
7 // Copyright 2021 Vircadia contributors.
8 //
9 // Distributed under the Apache License, Version 2.0.
10 // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
11 //
12 
13 #ifndef hifi_SendQueue_h
14 #define hifi_SendQueue_h
15 
16 #include <atomic>
17 #include <condition_variable>
18 #include <cstdint>
19 #include <list>
20 #include <memory>
21 #include <mutex>
22 #include <unordered_map>
23 
24 #include <QtCore/QObject>
25 #include <QtCore/QReadWriteLock>
26 
27 #include <PortableHighResolutionClock.h>
28 
29 #include "../SockAddr.h"
30 
31 #include "Constants.h"
32 #include "PacketQueue.h"
33 #include "SequenceNumber.h"
34 #include "LossList.h"
35 
36 namespace udt {
37 
38 class BasePacket;
39 class ControlPacket;
40 class Packet;
41 class PacketList;
42 class Socket;
43 
44 class SendQueue : public QObject {
45  Q_OBJECT
46 
47 public:
48  enum class State {
49  NotStarted,
50  Running,
51  Stopped
52  };
53 
54  static std::unique_ptr<SendQueue> create(Socket* socket, SockAddr destination,
55  SequenceNumber currentSequenceNumber, MessageNumber currentMessageNumber,
56  bool hasReceivedHandshakeACK);
57 
58  virtual ~SendQueue();
59 
60  void queuePacket(std::unique_ptr<Packet> packet);
61  void queuePacketList(std::unique_ptr<PacketList> packetList);
62 
63  SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
64  MessageNumber getCurrentMessageNumber() const { return _packets.getCurrentMessageNumber(); }
65 
66  void setFlowWindowSize(int flowWindowSize) { _flowWindowSize = flowWindowSize; }
67 
68  int getPacketSendPeriod() const { return _packetSendPeriod; }
69  void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; }
70 
71  void setEstimatedTimeout(int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; }
72 
73 public slots:
74  void stop();
75 
76  void ack(SequenceNumber ack);
77  void fastRetransmit(SequenceNumber ack);
78  void handshakeACK();
79  void updateDestinationAddress(SockAddr newAddress);
80 
81 signals:
82  void packetSent(int wireSize, int payloadSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint);
83  void packetRetransmitted(int wireSize, int payloadSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint);
84 
85  void queueInactive();
86 
87  void timeout();
88 
89 private slots:
90  void run();
91 
92 private:
93  Q_DISABLE_COPY_MOVE(SendQueue)
94  SendQueue(Socket* socket, SockAddr dest, SequenceNumber currentSequenceNumber,
95  MessageNumber currentMessageNumber, bool hasReceivedHandshakeACK);
96 
97  void sendHandshake();
98 
99  int sendPacket(const Packet& packet);
100  bool sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
101 
102  int maybeSendNewPacket(); // Figures out what packet to send next
103  bool maybeResendPacket(); // Determines whether to resend a packet and which one
104 
105  bool isInactive(bool attemptedToSendPacket);
106  void deactivate(); // makes the queue inactive and cleans it up
107 
108  bool isFlowWindowFull() const;
109 
110  // Increments current sequence number and return it
111  SequenceNumber getNextSequenceNumber();
112 
113  PacketQueue _packets;
114 
115  Socket* _socket { nullptr }; // Socket to send packet on
116  SockAddr _destination; // Destination addr
117 
118  std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number
119 
120  SequenceNumber _currentSequenceNumber { 0 }; // Last sequence number sent out
121  std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 }; // Atomic for last sequence number sent out
122 
123  std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
124  std::atomic<State> _state { State::NotStarted };
125 
126  std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
127 
128  std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
129 
130  mutable std::mutex _naksLock; // Protects the naks list.
131  LossList _naks; // Sequence numbers of packets to resend
132 
133  mutable QReadWriteLock _sentLock; // Protects the sent packet list
134  using PacketResendPair = std::pair<uint8_t, std::unique_ptr<Packet>>; // Number of resend + packet ptr
135  std::unordered_map<SequenceNumber, PacketResendPair> _sentPackets; // Packets waiting for ACK.
136 
137  std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable
138  std::atomic<bool> _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
139  std::condition_variable _handshakeACKCondition;
140 
141  std::condition_variable_any _emptyCondition;
142 
143  std::chrono::high_resolution_clock::time_point _lastPacketSentAt;
144 
145  static const std::chrono::microseconds MAXIMUM_ESTIMATED_TIMEOUT;
146  static const std::chrono::microseconds MINIMUM_ESTIMATED_TIMEOUT;
147 };
148 
149 }
150 
151 #endif // hifi_SendQueue_h