Overte C++ Documentation
AudioMixerWorkerPool.h
1 //
2 // AudioMixerWorkerPool.h
3 // assignment-client/src/audio
4 //
5 // Created by Zach Pomerantz on 11/16/2016.
6 // Copyright 2016 High Fidelity, Inc.
7 //
8 // Distributed under the Apache License, Version 2.0.
9 // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
10 //
11 
12 #ifndef hifi_AudioMixerWorkerPool_h
13 #define hifi_AudioMixerWorkerPool_h
14 
15 #include <condition_variable>
16 #include <mutex>
17 #include <vector>
18 
19 #include <QThread>
20 #include <shared/QtHelpers.h>
21 #include <TBBHelpers.h>
22 
23 #include "AudioMixerWorker.h"
24 
25 class AudioMixerWorkerPool;
26 
27 class AudioMixerWorkerThread : public QThread, public AudioMixerWorker {
28  Q_OBJECT
29  using ConstIter = NodeList::const_iterator;
30  using Mutex = std::mutex;
31  using Lock = std::unique_lock<Mutex>;
32 
33 public:
34  AudioMixerWorkerThread(AudioMixerWorkerPool& pool, AudioMixerWorker::SharedData& sharedData)
35  : AudioMixerWorker(sharedData), _pool(pool) {}
36 
37  void run() override final;
38 
39 private:
40  friend class AudioMixerWorkerPool;
41 
42  void wait();
43  void notify(bool stopping);
44  bool try_pop(SharedNodePointer& node);
45 
46  AudioMixerWorkerPool& _pool;
47  void (AudioMixerWorker::*_function)(const SharedNodePointer& node) { nullptr };
48  bool _stop { false };
49 };
50 
51 // Worker pool for audio mixers
52 // AudioMixerWorkerPool is not thread-safe! It should be instantiated and used from a single thread.
53 class AudioMixerWorkerPool {
54  using Queue = tbb::concurrent_queue<SharedNodePointer>;
55  using Mutex = std::mutex;
56  using Lock = std::unique_lock<Mutex>;
57  using ConditionVariable = std::condition_variable;
58 
59 public:
60  using ConstIter = NodeList::const_iterator;
61 
62  AudioMixerWorkerPool(AudioMixerWorker::SharedData& sharedData, int numThreads = QThread::idealThreadCount())
63  : _workerSharedData(sharedData) { setNumThreads(numThreads); }
64  ~AudioMixerWorkerPool() { resize(0); }
65 
66  // process packets on worker threads
67  void processPackets(ConstIter begin, ConstIter end);
68 
69  // mix on worker threads
70  void mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain);
71 
72  // iterate over all workers
73  void each(std::function<void(AudioMixerWorker& worker)> functor);
74 
75 #ifdef DEBUG_EVENT_QUEUE
76  void queueStats(QJsonObject& stats);
77 #endif
78 
79  void setNumThreads(int numThreads);
80  int numThreads() { return _numThreads; }
81 
82 private:
83  void run(ConstIter begin, ConstIter end);
84  void resize(int numThreads);
85 
86  std::vector<std::unique_ptr<AudioMixerWorkerThread>> _workers;
87 
88  friend void AudioMixerWorkerThread::wait();
89  friend void AudioMixerWorkerThread::notify(bool stopping);
90  friend bool AudioMixerWorkerThread::try_pop(SharedNodePointer& node);
91 
92  // synchronization state
93  Mutex _mutex;
94  ConditionVariable _workerCondition;
95  ConditionVariable _poolCondition;
96  void (AudioMixerWorker::*_function)(const SharedNodePointer& node);
97  std::function<void(AudioMixerWorker&)> _configure;
98  int _numThreads { 0 };
99  int _numStarted { 0 }; // guarded by _mutex
100  int _numFinished { 0 }; // guarded by _mutex
101  int _numStopped { 0 }; // guarded by _mutex
102 
103  // frame state
104  Queue _queue;
105  ConstIter _begin;
106  ConstIter _end;
107 
108  AudioMixerWorker::SharedData& _workerSharedData;
109 };
110 
111 #endif // hifi_AudioMixerWorkerPool_h