Overte C++ Documentation
AvatarMixerSlavePool.h
1 //
2 // AvatarMixerSlavePool.h
3 // assignment-client/src/avatar
4 //
5 // Created by Brad Hefta-Gaub on 2/14/2017.
6 // Copyright 2017 High Fidelity, Inc.
7 //
8 // Distributed under the Apache License, Version 2.0.
9 // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
10 //
11 
12 #ifndef hifi_AvatarMixerSlavePool_h
13 #define hifi_AvatarMixerSlavePool_h
14 
15 #include <condition_variable>
16 #include <mutex>
17 #include <vector>
18 
19 #include <QThread>
20 
21 #include <TBBHelpers.h>
22 #include <NodeList.h>
23 #include <shared/QtHelpers.h>
24 
25 #include "AvatarMixerSlave.h"
26 
27 
28 class AvatarMixerSlavePool;
29 
30 class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave {
31  Q_OBJECT
32  using ConstIter = NodeList::const_iterator;
33  using Mutex = std::mutex;
34  using Lock = std::unique_lock<Mutex>;
35 
36 public:
37  AvatarMixerSlaveThread(AvatarMixerSlavePool& pool, SlaveSharedData* slaveSharedData) :
38  AvatarMixerSlave(slaveSharedData), _pool(pool) {};
39 
40  void run() override final;
41 
42 private:
43  friend class AvatarMixerSlavePool;
44 
45  void wait();
46  void notify(bool stopping);
47  bool try_pop(SharedNodePointer& node);
48 
49  AvatarMixerSlavePool& _pool;
50  void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr };
51  bool _stop { false };
52 };
53 
54 // Slave pool for avatar mixers
55 // AvatarMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread.
56 class AvatarMixerSlavePool {
57  using Queue = tbb::concurrent_queue<SharedNodePointer>;
58  using Mutex = std::mutex;
59  using Lock = std::unique_lock<Mutex>;
60  using ConditionVariable = std::condition_variable;
61 
62 public:
63  using ConstIter = NodeList::const_iterator;
64 
65  AvatarMixerSlavePool(SlaveSharedData* slaveSharedData, int numThreads = QThread::idealThreadCount()) :
66  _slaveSharedData(slaveSharedData) { setNumThreads(numThreads); }
67  ~AvatarMixerSlavePool() { resize(0); }
68 
69  // Jobs the slave pool can do...
70  void processIncomingPackets(ConstIter begin, ConstIter end);
71  void broadcastAvatarData(ConstIter begin, ConstIter end,
72  p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio);
73 
74  // iterate over all slaves
75  void each(std::function<void(AvatarMixerSlave& slave)> functor);
76 
77 #ifdef DEBUG_EVENT_QUEUE
78  void queueStats(QJsonObject& stats);
79 #endif
80 
81  void setNumThreads(int numThreads);
82  int numThreads() const { return _numThreads; }
83 
84  void setPriorityReservedFraction(float fraction) { _priorityReservedFraction = fraction; }
85  float getPriorityReservedFraction() const { return _priorityReservedFraction; }
86 
87 private:
88  void run(ConstIter begin, ConstIter end);
89  void resize(int numThreads);
90 
91  std::vector<std::unique_ptr<AvatarMixerSlaveThread>> _slaves;
92 
93  friend void AvatarMixerSlaveThread::wait();
94  friend void AvatarMixerSlaveThread::notify(bool stopping);
95  friend bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node);
96 
97  // synchronization state
98  Mutex _mutex;
99  ConditionVariable _slaveCondition;
100  ConditionVariable _poolCondition;
101  void (AvatarMixerSlave::*_function)(const SharedNodePointer& node);
102  std::function<void(AvatarMixerSlave&)> _configure;
103 
104  // Set from Domain Settings:
105  float _priorityReservedFraction { 0.4f };
106  int _numThreads { 0 };
107 
108  int _numStarted { 0 }; // guarded by _mutex
109  int _numFinished { 0 }; // guarded by _mutex
110  int _numStopped { 0 }; // guarded by _mutex
111 
112  // frame state
113  Queue _queue;
114  ConstIter _begin;
115  ConstIter _end;
116 
117  SlaveSharedData* _slaveSharedData;
118 };
119 
120 #endif // hifi_AvatarMixerSlavePool_h