Branch data Line data Source code
1 : : #include <algorithm>
2 : : #include <cstring>
3 : :
4 : : #include "statistics_manager.hpp"
5 : :
6 : : // static const char* TAG = "StatsMgr";
7 : :
8 : : /** @brief Fixed EMA alpha for RTT (32/256 = 12.5% = 1/8). Smoother than RSSI alpha to filter out transient spikes. */
9 : : static constexpr uint8_t RTT_EMA_ALPHA = 32;
10 : :
11 : 24 : StatisticsManager::StatisticsManager(IStorageManager& storage, IFreeRTOSHAL& hal_freertos)
12 : 24 : : storage_(storage)
13 : 24 : , hal_freertos_(hal_freertos)
14 : : {
15 : 24 : }
16 : :
17 : 48 : StatisticsManager::~StatisticsManager()
18 : : {
19 : : // deinit();
20 : 48 : }
21 : :
22 : 24 : esp_err_t StatisticsManager::init()
23 : : {
24 : 24 : mutex_ = hal_freertos_.mutex_create();
25 [ + - ]: 24 : if (mutex_ == nullptr) {
26 : : return ESP_ERR_NO_MEM;
27 : : }
28 : :
29 : : // Load existing statistics from storage if any
30 : 24 : etl::vector<PeerStatisticsPersist, MAX_PEERS> persisted;
31 [ + - ]: 24 : if (storage_.load_stats(persisted) == ESP_OK) {
32 [ + + ]: 25 : for (const auto& p : persisted) {
33 : 1 : PeerStatisticsEntry entry;
34 : 1 : entry.stats.node_id = p.node_id;
35 : 1 : entry.stats.rssi_avg = p.rssi_avg;
36 : 1 : entry.stats.packets_rx = p.packets_rx;
37 : 1 : entry.stats.packets_sent = p.packets_sent;
38 : 1 : entry.stats.packets_lost = p.packets_lost;
39 : 1 : entry.stats.rtt_avg_ms = p.rtt_avg_ms;
40 : 1 : entry.stats.retries = p.retries;
41 : 1 : entries_.push_back(entry);
42 : : }
43 : : }
44 : :
45 : 24 : return ESP_OK;
46 : 24 : }
47 : :
48 : 1 : esp_err_t StatisticsManager::deinit()
49 : : {
50 [ + - ]: 1 : if (mutex_ != nullptr) {
51 : 1 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
52 : :
53 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
54 : 1 : snapshot = build_persist_snapshot(); // capture before clearing
55 : 1 : entries_.clear();
56 : 1 : hal_freertos_.semaphore_give(mutex_);
57 : : }
58 : :
59 [ + - ]: 1 : if (snapshot.has_value()) {
60 : 1 : storage_.store_stats(snapshot.value());
61 : : }
62 : :
63 : 1 : hal_freertos_.semaphore_delete(mutex_);
64 [ + - ]: 1 : mutex_ = nullptr;
65 : 1 : }
66 : 1 : return ESP_OK;
67 : : }
68 : :
69 : 25 : void StatisticsManager::on_peer_added(NodeId node_id, uint32_t heartbeat_interval_ms)
70 : : {
71 [ + - ]: 25 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
72 : 25 : auto entry = find_entry(node_id);
73 [ + + ]: 25 : if (entry == nullptr) {
74 : 24 : PeerStatisticsEntry new_entry;
75 : 24 : new_entry.stats.node_id = node_id;
76 : 24 : new_entry.stats.rssi_alpha = compute_alpha(heartbeat_interval_ms);
77 : 24 : entries_.push_back(new_entry);
78 : : }
79 : : else {
80 : 1 : entry->stats.rssi_alpha = compute_alpha(heartbeat_interval_ms);
81 : : }
82 : 25 : hal_freertos_.semaphore_give(mutex_);
83 : : }
84 : 25 : }
85 : :
86 : 1 : void StatisticsManager::on_peer_removed(NodeId node_id)
87 : : {
88 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
89 [ + - ]: 1 : auto it = std::find_if(
90 [ - - - - : 1 : entries_.begin(), entries_.end(), [node_id](const auto& e) { return e.stats.node_id == node_id; });
- - - - -
- - - +
- ]
91 [ + - ]: 1 : if (it != entries_.end()) {
92 : 1 : entries_.erase(it);
93 : : }
94 : 1 : hal_freertos_.semaphore_give(mutex_);
95 : : }
96 : 1 : }
97 : :
98 : 2 : void StatisticsManager::sync_peers(const etl::ivector<PeerInfo>& known_peers)
99 : : {
100 [ + - ]: 2 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
101 : 2 : auto it = entries_.begin();
102 [ + + ]: 6 : while (it != entries_.end()) {
103 [ + + ]: 4 : bool found = std::any_of(known_peers.begin(), known_peers.end(), [&](const PeerInfo& p) {
104 [ - - - - : 5 : return p.node_id == it->stats.node_id;
- - - - -
- + + +
+ ]
105 : : });
106 [ + + ]: 4 : it = found ? std::next(it) : entries_.erase(it);
107 : : }
108 : 2 : hal_freertos_.semaphore_give(mutex_);
109 : : }
110 : 2 : }
111 : :
112 : 55 : void StatisticsManager::on_packet_received(NodeId node_id, int8_t rssi)
113 : : {
114 : 55 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
115 : :
116 [ + - ]: 55 : if (hal_freertos_.semaphore_take(mutex_, pdMS_TO_TICKS(5)) == pdTRUE) {
117 : 55 : auto entry = find_entry(node_id);
118 [ + + ]: 55 : if (entry != nullptr) {
119 : 54 : entry->stats.rssi_last = rssi;
120 [ + + ]: 54 : if (entry->stats.rssi_avg == -127) {
121 : 4 : entry->stats.rssi_avg = rssi;
122 : : }
123 : : else {
124 : 50 : entry->stats.rssi_avg = update_ema_i8(entry->stats.rssi_avg, rssi, entry->stats.rssi_alpha);
125 : : }
126 : 54 : entry->stats.packets_rx++;
127 : 54 : entry->dirty_rx++;
128 [ + + ]: 108 : snapshot = maybe_build_flush_snapshot(*entry);
129 : : }
130 : 55 : hal_freertos_.semaphore_give(mutex_);
131 : : }
132 : :
133 [ + + ]: 55 : if (snapshot.has_value()) {
134 [ + - ]: 1 : if (storage_.store_stats(snapshot.value()) == ESP_OK) {
135 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
136 : 1 : reset_dirty_counters();
137 : 1 : hal_freertos_.semaphore_give(mutex_);
138 : : }
139 : : }
140 : : }
141 : 55 : }
142 : :
143 : 34 : void StatisticsManager::on_ack_received(NodeId node_id, uint32_t rtt_ms)
144 : : {
145 : 34 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
146 : :
147 [ + - ]: 34 : if (hal_freertos_.semaphore_take(mutex_, pdMS_TO_TICKS(5)) == pdTRUE) {
148 : 34 : auto entry = find_entry(node_id);
149 [ + + ]: 34 : if (entry != nullptr) {
150 : 33 : entry->stats.rtt_last_ms = rtt_ms;
151 [ + + ]: 33 : if (entry->stats.rtt_avg_ms == 0) {
152 : 3 : entry->stats.rtt_avg_ms = rtt_ms;
153 : : }
154 : : else {
155 : 30 : entry->stats.rtt_avg_ms = update_ema_u32(entry->stats.rtt_avg_ms, rtt_ms, RTT_EMA_ALPHA);
156 : : }
157 : 33 : entry->dirty_rtt++;
158 [ + + ]: 66 : snapshot = maybe_build_flush_snapshot(*entry);
159 : : }
160 : 34 : hal_freertos_.semaphore_give(mutex_);
161 : : }
162 : :
163 [ + + ]: 34 : if (snapshot.has_value()) {
164 [ + - ]: 1 : if (storage_.store_stats(snapshot.value()) == ESP_OK) {
165 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
166 : 1 : reset_dirty_counters();
167 : 1 : hal_freertos_.semaphore_give(mutex_);
168 : : }
169 : : }
170 : : }
171 : 34 : }
172 : :
173 : 51 : void StatisticsManager::on_delivery_success(NodeId node_id)
174 : : {
175 : 51 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
176 : :
177 [ + - ]: 51 : if (hal_freertos_.semaphore_take(mutex_, pdMS_TO_TICKS(5)) == pdTRUE) {
178 : 51 : auto entry = find_entry(node_id);
179 [ + - ]: 51 : if (entry != nullptr) {
180 : 51 : entry->stats.packets_sent++;
181 : 51 : entry->dirty_tx++;
182 [ + + ]: 102 : snapshot = maybe_build_flush_snapshot(*entry);
183 : : }
184 : 51 : hal_freertos_.semaphore_give(mutex_);
185 : : }
186 : :
187 [ + + ]: 51 : if (snapshot.has_value()) {
188 [ + - ]: 1 : if (storage_.store_stats(snapshot.value()) == ESP_OK) {
189 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
190 : 1 : reset_dirty_counters();
191 : 1 : hal_freertos_.semaphore_give(mutex_);
192 : : }
193 : : }
194 : : }
195 : 51 : }
196 : :
197 : 14 : void StatisticsManager::on_delivery_failure(NodeId node_id)
198 : : {
199 : 14 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
200 : :
201 [ + - ]: 14 : if (hal_freertos_.semaphore_take(mutex_, pdMS_TO_TICKS(5)) == pdTRUE) {
202 : 14 : auto entry = find_entry(node_id);
203 [ + + ]: 14 : if (entry != nullptr) {
204 : 13 : entry->stats.delivery_failures++;
205 : 13 : entry->dirty_tx_fail++;
206 [ + + ]: 26 : snapshot = maybe_build_flush_snapshot(*entry);
207 : : }
208 : 14 : hal_freertos_.semaphore_give(mutex_);
209 : : }
210 : :
211 [ + + ]: 14 : if (snapshot.has_value()) {
212 [ + - ]: 1 : if (storage_.store_stats(snapshot.value()) == ESP_OK) {
213 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
214 : 1 : reset_dirty_counters();
215 : 1 : hal_freertos_.semaphore_give(mutex_);
216 : : }
217 : : }
218 : : }
219 : 14 : }
220 : :
221 : 12 : void StatisticsManager::on_driver_error(NodeId node_id)
222 : : {
223 : 12 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
224 : :
225 [ + - ]: 12 : if (hal_freertos_.semaphore_take(mutex_, pdMS_TO_TICKS(5)) == pdTRUE) {
226 : 12 : auto entry = find_entry(node_id);
227 [ + - ]: 12 : if (entry != nullptr) {
228 : 12 : entry->stats.driver_errors++;
229 : 12 : entry->dirty_driver_err++;
230 [ + + ]: 24 : snapshot = maybe_build_flush_snapshot(*entry);
231 : : }
232 : 12 : hal_freertos_.semaphore_give(mutex_);
233 : : }
234 : :
235 [ + + ]: 12 : if (snapshot.has_value()) {
236 [ + - ]: 1 : if (storage_.store_stats(snapshot.value()) == ESP_OK) {
237 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
238 : 1 : reset_dirty_counters();
239 : 1 : hal_freertos_.semaphore_give(mutex_);
240 : : }
241 : : }
242 : : }
243 : 12 : }
244 : :
245 : 10 : void StatisticsManager::on_packet_lost(NodeId node_id)
246 : : {
247 : 10 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
248 : :
249 [ + - ]: 10 : if (hal_freertos_.semaphore_take(mutex_, pdMS_TO_TICKS(5)) == pdTRUE) {
250 : 10 : auto entry = find_entry(node_id);
251 [ + - ]: 10 : if (entry != nullptr) {
252 : 10 : entry->stats.packets_lost++;
253 : 10 : entry->dirty_lost++;
254 [ + + ]: 20 : snapshot = maybe_build_flush_snapshot(*entry);
255 : : }
256 : 10 : hal_freertos_.semaphore_give(mutex_);
257 : : }
258 : :
259 [ + + ]: 10 : if (snapshot.has_value()) {
260 [ + - ]: 1 : if (storage_.store_stats(snapshot.value()) == ESP_OK) {
261 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
262 : 1 : reset_dirty_counters();
263 : 1 : hal_freertos_.semaphore_give(mutex_);
264 : : }
265 : : }
266 : : }
267 : 10 : }
268 : :
269 : 51 : void StatisticsManager::on_retry(NodeId node_id)
270 : : {
271 : 51 : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>> snapshot;
272 : :
273 [ + - ]: 51 : if (hal_freertos_.semaphore_take(mutex_, pdMS_TO_TICKS(5)) == pdTRUE) {
274 : 51 : auto entry = find_entry(node_id);
275 [ + - ]: 51 : if (entry != nullptr) {
276 : 51 : entry->stats.retries++;
277 : : // Retries are considered "soft" dirty, no separate flush threshold
278 : 51 : entry->dirty_tx++;
279 [ + + ]: 102 : snapshot = maybe_build_flush_snapshot(*entry);
280 : : }
281 : 51 : hal_freertos_.semaphore_give(mutex_);
282 : : }
283 : :
284 [ + + ]: 51 : if (snapshot.has_value()) {
285 [ + - ]: 1 : if (storage_.store_stats(snapshot.value()) == ESP_OK) {
286 [ + - ]: 1 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
287 : 1 : reset_dirty_counters();
288 : 1 : hal_freertos_.semaphore_give(mutex_);
289 : : }
290 : : }
291 : : }
292 : 51 : }
293 : :
294 : 14 : bool StatisticsManager::get(NodeId node_id, PeerStatistics& out) const
295 : : {
296 [ + - ]: 14 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
297 : 14 : auto entry = find_entry(node_id);
298 [ + + ]: 14 : if (entry != nullptr) {
299 : 13 : out = entry->stats;
300 : 13 : hal_freertos_.semaphore_give(mutex_);
301 : 13 : return true;
302 : : }
303 : 1 : hal_freertos_.semaphore_give(mutex_);
304 : : }
305 : : return false;
306 : : }
307 : :
308 : 4 : etl::vector<PeerStatistics, MAX_PEERS> StatisticsManager::get_all() const
309 : : {
310 : 4 : etl::vector<PeerStatistics, MAX_PEERS> result;
311 [ + - ]: 4 : if (hal_freertos_.semaphore_take(mutex_, portMAX_DELAY) == pdTRUE) {
312 [ + + ]: 7 : for (const auto& entry : entries_) {
313 : 3 : result.push_back(entry.stats);
314 : : }
315 : 4 : hal_freertos_.semaphore_give(mutex_);
316 : : }
317 : 4 : return result;
318 : : }
319 : :
320 : : // =========================================================================================
321 : : // Private methods
322 : : // =========================================================================================
323 : :
324 : 252 : StatisticsManager::PeerStatisticsEntry* StatisticsManager::find_entry(NodeId node_id)
325 : : {
326 [ + + ]: 256 : for (auto& entry : entries_) {
327 [ + + ]: 229 : if (entry.stats.node_id == node_id) {
328 : : return &entry;
329 : : }
330 : : }
331 : : return nullptr;
332 : : }
333 : :
334 : 14 : const StatisticsManager::PeerStatisticsEntry* StatisticsManager::find_entry(NodeId node_id) const
335 : : {
336 [ + + ]: 17 : for (const auto& entry : entries_) {
337 [ + + ]: 16 : if (entry.stats.node_id == node_id) {
338 : : return &entry;
339 : : }
340 : : }
341 : : return nullptr;
342 : : }
343 : :
344 : 25 : uint8_t StatisticsManager::compute_alpha(uint32_t heartbeat_interval_ms)
345 : : {
346 : : // Returns EMA alpha in fixed point (0-256), where 256 is 100% weight to new sample.
347 : : // Short intervals (1s) -> alpha ~10% (26/256) - Smooth
348 : : // Medium intervals (10s) -> alpha ~20% (51/256)
349 : : // Long intervals (30s) -> alpha ~25% (64/256)
350 : : // Very long/Unknown -> alpha ~40% (102/256) - Reactive
351 [ + - ]: 25 : if (heartbeat_interval_ms == 0)
352 : : return 51; // 20%
353 [ + + ]: 25 : if (heartbeat_interval_ms < 5000)
354 : : return 26; // 10%
355 [ + - ]: 1 : if (heartbeat_interval_ms < 30000)
356 : 1 : return 64; // 25%
357 : : return 102; // 40%
358 : : }
359 : :
360 : 50 : int8_t StatisticsManager::update_ema_i8(int8_t avg, int8_t sample, uint8_t alpha)
361 : : {
362 : : // EMA = (sample * alpha + avg * (256 - alpha)) / 256
363 : 50 : int32_t a = static_cast<int32_t>(alpha);
364 : 50 : int32_t val = (static_cast<int32_t>(sample) * a + static_cast<int32_t>(avg) * (256 - a)) >> 8;
365 : 50 : return static_cast<int8_t>(val);
366 : : }
367 : :
368 : 30 : uint32_t StatisticsManager::update_ema_u32(uint32_t avg, uint32_t sample, uint8_t alpha)
369 : : {
370 : 30 : uint32_t a = static_cast<uint32_t>(alpha);
371 : 30 : return (sample * a + avg * (256 - a)) >> 8;
372 : : }
373 : :
374 : 224 : bool StatisticsManager::has_crossed_flush_threshold(const PeerStatisticsEntry& entry) const
375 : : {
376 [ + + ]: 223 : return entry.dirty_rx >= flush_threshold_rx_ || entry.dirty_tx >= flush_threshold_tx_ ||
377 [ + + + + ]: 221 : entry.dirty_tx_fail >= flush_threshold_tx_fail_ || entry.dirty_driver_err >= flush_threshold_driver_err_ ||
378 [ + + + + : 443 : entry.dirty_lost >= flush_threshold_lost_ || entry.dirty_rtt >= flush_threshold_rtt_;
+ + ]
379 : : }
380 : :
381 : 8 : etl::vector<PeerStatisticsPersist, MAX_PEERS> StatisticsManager::build_persist_snapshot()
382 : : {
383 : 8 : etl::vector<PeerStatisticsPersist, MAX_PEERS> snapshot;
384 [ + + ]: 16 : for (const auto& entry : entries_) {
385 : 8 : PeerStatisticsPersist p;
386 : 8 : p.node_id = entry.stats.node_id;
387 : 8 : p.rssi_avg = entry.stats.rssi_avg;
388 : 8 : p.packets_rx = entry.stats.packets_rx;
389 : 8 : p.packets_sent = entry.stats.packets_sent;
390 : 8 : p.driver_errors = entry.stats.driver_errors;
391 : 8 : p.delivery_failures = entry.stats.delivery_failures;
392 : 8 : p.packets_lost = entry.stats.packets_lost;
393 : 8 : p.rtt_avg_ms = entry.stats.rtt_avg_ms;
394 : 8 : p.retries = entry.stats.retries;
395 : 8 : snapshot.push_back(p);
396 : : }
397 : 8 : return snapshot;
398 : : }
399 : :
400 : 7 : void StatisticsManager::reset_dirty_counters()
401 : : {
402 [ + + ]: 14 : for (auto& entry : entries_) {
403 : 7 : entry.dirty_rx = 0;
404 : 7 : entry.dirty_tx = 0;
405 : 7 : entry.dirty_tx_fail = 0;
406 : 7 : entry.dirty_driver_err = 0;
407 : 7 : entry.dirty_lost = 0;
408 : 7 : entry.dirty_rtt = 0;
409 : : }
410 : 7 : }
411 : :
412 : : std::optional<etl::vector<PeerStatisticsPersist, MAX_PEERS>>
413 : 224 : StatisticsManager::maybe_build_flush_snapshot(const PeerStatisticsEntry& entry)
414 : : {
415 [ + + ]: 224 : if (has_crossed_flush_threshold(entry)) {
416 : 7 : return build_persist_snapshot();
417 : : }
418 : 217 : return std::nullopt;
419 : : }
|