Overte C++ Documentation
AvatarMixerWorkerPool.h
1 //
2 // AvatarMixerWorkerPool.h
3 // assignment-client/src/avatar
4 //
5 // Created by Brad Hefta-Gaub on 2/14/2017.
6 // Copyright 2017 High Fidelity, Inc.
7 //
8 // Distributed under the Apache License, Version 2.0.
9 // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
10 //
11 
12 #ifndef hifi_AvatarMixerWorkerPool_h
13 #define hifi_AvatarMixerWorkerPool_h
14 
15 #include <condition_variable>
16 #include <mutex>
17 #include <vector>
18 
19 #include <QThread>
20 
21 #include <TBBHelpers.h>
22 #include <NodeList.h>
23 #include <shared/QtHelpers.h>
24 
25 #include "AvatarMixerWorker.h"
26 
27 
28 class AvatarMixerWorkerPool;
29 
30 class AvatarMixerWorkerThread : public QThread, public AvatarMixerWorker {
31  Q_OBJECT
32  using ConstIter = NodeList::const_iterator;
33  using Mutex = std::mutex;
34  using Lock = std::unique_lock<Mutex>;
35 
36 public:
37  AvatarMixerWorkerThread(AvatarMixerWorkerPool& pool, WorkerSharedData* workerSharedData) :
38  AvatarMixerWorker(workerSharedData), _pool(pool) {};
39 
40  void run() override final;
41 
42 private:
43  friend class AvatarMixerWorkerPool;
44 
45  void wait();
46  void notify(bool stopping);
47  bool try_pop(SharedNodePointer& node);
48 
49  AvatarMixerWorkerPool& _pool;
50  void (AvatarMixerWorker::*_function)(const SharedNodePointer& node) { nullptr };
51  bool _stop { false };
52 };
53 
54 // Worker pool for avatar mixers
55 // AvatarMixerWorkerPool is not thread-safe! It should be instantiated and used from a single thread.
56 class AvatarMixerWorkerPool {
57  using Queue = tbb::concurrent_queue<SharedNodePointer>;
58  using Mutex = std::mutex;
59  using Lock = std::unique_lock<Mutex>;
60  using ConditionVariable = std::condition_variable;
61 
62 public:
63  using ConstIter = NodeList::const_iterator;
64 
65  AvatarMixerWorkerPool(WorkerSharedData* workerSharedData, int numThreads = QThread::idealThreadCount()) :
66  _workerSharedData(workerSharedData) { setNumThreads(numThreads); }
67  ~AvatarMixerWorkerPool() { resize(0); }
68 
69  // Jobs the worker pool can do...
70  void processIncomingPackets(ConstIter begin, ConstIter end);
71  void broadcastAvatarData(ConstIter begin, ConstIter end,
72  p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio);
73 
74  // iterate over all workers
75  void each(std::function<void(AvatarMixerWorker& worker)> functor);
76 
77 #ifdef DEBUG_EVENT_QUEUE
78  void queueStats(QJsonObject& stats);
79 #endif
80 
81  void setNumThreads(int numThreads);
82  int numThreads() const { return _numThreads; }
83 
84  void setPriorityReservedFraction(float fraction) { _priorityReservedFraction = fraction; }
85  float getPriorityReservedFraction() const { return _priorityReservedFraction; }
86 
87 private:
88  void run(ConstIter begin, ConstIter end);
89  void resize(int numThreads);
90 
91  std::vector<std::unique_ptr<AvatarMixerWorkerThread>> _workers;
92 
93  friend void AvatarMixerWorkerThread::wait();
94  friend void AvatarMixerWorkerThread::notify(bool stopping);
95  friend bool AvatarMixerWorkerThread::try_pop(SharedNodePointer& node);
96 
97  // synchronization state
98  Mutex _mutex;
99  ConditionVariable _workerCondition;
100  ConditionVariable _poolCondition;
101  void (AvatarMixerWorker::*_function)(const SharedNodePointer& node);
102  std::function<void(AvatarMixerWorker&)> _configure;
103 
104  // Set from Domain Settings:
105  float _priorityReservedFraction { 0.4f };
106  int _numThreads { 0 };
107 
108  int _numStarted { 0 }; // guarded by _mutex
109  int _numFinished { 0 }; // guarded by _mutex
110  int _numStopped { 0 }; // guarded by _mutex
111 
112  // frame state
113  Queue _queue;
114  ConstIter _begin;
115  ConstIter _end;
116 
117  WorkerSharedData* _workerSharedData;
118 };
119 
120 #endif // hifi_AvatarMixerWorkerPool_h