Branch data Line data Source code
1 : : #include <cstring>
2 : :
3 : : #include "esp_log.h"
4 : :
5 : : #include "tx_manager.hpp"
6 : :
7 : : static const char* TAG = "TxManager";
8 : :
9 : 45 : TxManager::TxManager(
10 : : ITxStateMachine& fsm,
11 : : IEspNowHAL& hal_espnow,
12 : : IFreeRTOSHAL& freertos_hal,
13 : : IMessageCodec& codec,
14 : : IStatisticsManager& stats_mgr,
15 : 45 : IPeerManager& peer_mgr)
16 : 45 : : fsm_(fsm)
17 : 45 : , hal_espnow_(hal_espnow)
18 : 45 : , codec_(codec)
19 : 45 : , freertos_hal_(freertos_hal)
20 : 45 : , stats_mgr_(stats_mgr)
21 : 45 : , peer_mgr_(peer_mgr)
22 : 45 : , sequence_counter_(0)
23 : 45 : , ack_timeout_ms_(0)
24 : 45 : , task_done_semaphore_(nullptr)
25 : 45 : , tx_queue_(nullptr)
26 : 45 : , ack_timeout_timer_(nullptr)
27 : 45 : , tx_task_handle_(nullptr)
28 : : {
29 : 45 : }
30 : :
31 : 90 : TxManager::~TxManager()
32 : : {
33 : : // deinit(); must be called before destruction to clean up resources
34 : 90 : }
35 : :
36 : 1 : void TxManager::ack_timeout_callback(TimerHandle_t xTimer)
37 : : {
38 : 1 : TxManager* self = static_cast<TxManager*>(pvTimerGetTimerID(xTimer));
39 : 1 : self->freertos_hal_.task_notify(self->tx_task_handle_, NOTIFY_ACK_TIMEOUT, eSetBits);
40 : 1 : }
41 : :
42 : : esp_err_t
43 : 44 : TxManager::init(uint32_t stack_size, UBaseType_t priority, TaskHandle_t rx_task_handle, uint32_t ack_timeout_ms)
44 : : {
45 [ + + ]: 44 : if (rx_task_handle == nullptr) {
46 : : ESP_LOGE(TAG, "RX task handle is null");
47 : : return ESP_ERR_INVALID_ARG;
48 : : }
49 : 43 : rx_task_handle_ = rx_task_handle;
50 : 43 : ack_timeout_ms_ = ack_timeout_ms;
51 : :
52 : 43 : tx_queue_ = freertos_hal_.queue_create(20, sizeof(DecodedTxPacket));
53 [ + + ]: 43 : if (tx_queue_ == nullptr) {
54 : : return ESP_ERR_NO_MEM;
55 : : }
56 : :
57 : 42 : delivery_queue_ = freertos_hal_.queue_create(2, sizeof(DeliveryEvent));
58 [ - + ]: 42 : if (delivery_queue_ == nullptr) {
59 : 0 : deinit();
60 : 0 : return ESP_ERR_NO_MEM;
61 : : }
62 : :
63 : 42 : task_done_semaphore_ = freertos_hal_.semaphore_create_binary();
64 [ + + ]: 42 : if (task_done_semaphore_ == nullptr) {
65 : 1 : deinit();
66 : 1 : return ESP_ERR_NO_MEM;
67 : : }
68 : :
69 : 82 : ack_timeout_timer_ =
70 : 41 : freertos_hal_.timer_create("ack_timeout", pdMS_TO_TICKS(ack_timeout_ms_), pdFALSE, this, ack_timeout_callback);
71 [ + + ]: 41 : if (ack_timeout_timer_ == nullptr) {
72 : 1 : deinit();
73 : 1 : return ESP_ERR_NO_MEM;
74 : : }
75 : :
76 : 40 : BaseType_t task_creation =
77 : 40 : freertos_hal_.task_create(tx_task_func, "tx_manager_task", stack_size, this, priority, &tx_task_handle_);
78 [ + + ]: 40 : if (task_creation != pdPASS) {
79 : 1 : deinit();
80 : 1 : return ESP_ERR_NO_MEM;
81 : : }
82 : :
83 : : return ESP_OK;
84 : : }
85 : :
86 : 36 : void TxManager::deinit()
87 : : {
88 [ + + ]: 36 : if (tx_task_handle_ != nullptr) {
89 : : // Notify task to stop
90 : 31 : freertos_hal_.task_notify(tx_task_handle_, NOTIFY_TASK_TO_STOP, eSetBits);
91 : :
92 : : // Send dummy packet to wakeup task
93 : 31 : DecodedTxPacket stop_packet = {};
94 [ + - ]: 31 : if (tx_queue_ != nullptr) {
95 : 31 : freertos_hal_.queue_send(tx_queue_, &stop_packet, 0);
96 : : }
97 : :
98 : : // Wait for task to exit
99 : : uint8_t delay_ms = 10;
100 [ + + ]: 131 : for (int timeout = 1000; timeout > 0; timeout -= delay_ms) {
101 [ + + ]: 130 : if (freertos_hal_.semaphore_take(task_done_semaphore_, delay_ms) == pdPASS)
102 : : break;
103 : : }
104 : : // Forcing deleting task
105 [ + + ]: 31 : if (tx_task_handle_ != nullptr) {
106 : 8 : ESP_LOGW(TAG, "Forcing deletion of tx manager task");
107 : 8 : freertos_hal_.task_delete(tx_task_handle_);
108 : 8 : tx_task_handle_ = nullptr;
109 : : }
110 : : }
111 : :
112 [ + + ]: 36 : if (task_done_semaphore_ != nullptr) {
113 : 35 : freertos_hal_.semaphore_delete(task_done_semaphore_);
114 : 35 : task_done_semaphore_ = nullptr;
115 : : }
116 : :
117 [ + - ]: 36 : if (tx_queue_ != nullptr) {
118 : 36 : freertos_hal_.queue_delete(tx_queue_);
119 : 36 : tx_queue_ = nullptr;
120 : : }
121 : :
122 [ + - ]: 36 : if (delivery_queue_ != nullptr) {
123 : 36 : freertos_hal_.queue_delete(delivery_queue_);
124 : 36 : delivery_queue_ = nullptr;
125 : : }
126 : :
127 [ + + ]: 36 : if (ack_timeout_timer_ != nullptr) {
128 : 34 : freertos_hal_.timer_delete(ack_timeout_timer_, portMAX_DELAY);
129 : 34 : ack_timeout_timer_ = nullptr;
130 : : }
131 : 36 : }
132 : :
133 : 15 : esp_err_t TxManager::queue_packet(const DecodedTxPacket& packet)
134 : : {
135 [ + + ]: 15 : if (tx_queue_ == nullptr) {
136 : : return ESP_ERR_INVALID_STATE;
137 : : }
138 : :
139 [ + + ]: 14 : if (freertos_hal_.queue_send(tx_queue_, &packet, 100) != pdTRUE) {
140 : : return ESP_FAIL;
141 : : }
142 : :
143 [ + + ]: 13 : if (tx_task_handle_ != nullptr) {
144 : 12 : freertos_hal_.task_notify(tx_task_handle_, NOTIFY_DATA, eSetBits);
145 : : }
146 : :
147 : : return ESP_OK;
148 : : }
149 : :
150 : 16 : void TxManager::notify_delivery(esp_now_send_status_t status, const uint8_t* dest_mac)
151 : : {
152 [ + + + - : 16 : if (tx_task_handle_ == nullptr || delivery_queue_ == nullptr || dest_mac == nullptr) {
- + ]
153 : 2 : return;
154 : : }
155 : 14 : DeliveryEvent event;
156 : 14 : memcpy(event.dest_mac, dest_mac, 6);
157 : 14 : event.status = static_cast<uint8_t>(status);
158 : 14 : BaseType_t higher_prio_woken = pdFALSE;
159 : 14 : freertos_hal_.queue_send_fromISR(delivery_queue_, &event, &higher_prio_woken);
160 : :
161 : : // Set ONLY the relevant notification bit
162 [ + + ]: 14 : uint32_t notify_bit = (status == ESP_NOW_SEND_FAIL) ? NOTIFY_DELIVERY_FAILURE : NOTIFY_DELIVERY_SUCCESS;
163 : 14 : freertos_hal_.task_notify(tx_task_handle_, notify_bit, eSetBits);
164 : : }
165 : :
166 : 3 : void TxManager::notify_link_alive()
167 : : {
168 [ + + ]: 3 : if (tx_task_handle_ != nullptr) {
169 : 2 : freertos_hal_.task_notify(tx_task_handle_, NOTIFY_LINK_ALIVE, eSetBits);
170 : : }
171 : 3 : }
172 : :
173 : 4 : void TxManager::handle_ack(const DecodedRxPacket& decoded)
174 : : {
175 : 4 : auto pending_ack = fsm_.get_pending_ack();
176 [ + - ]: 4 : if (!pending_ack.has_value()) {
177 : : ESP_LOGW(TAG, "ACK received but no pending packet");
178 : 2 : return;
179 : : }
180 : :
181 [ + + ]: 4 : if (decoded.header.sequence_number != pending_ack->sequence_number) {
182 : : ESP_LOGW(
183 : : TAG,
184 : : "ACK seq_num mismatch, expected: %d, got: %d",
185 : : (int)pending_ack->sequence_number,
186 : : (int)decoded.header.sequence_number);
187 : : return;
188 : : }
189 : :
190 [ + + ]: 3 : if (decoded.header.ack_status != AckStatus::OK) {
191 : 1 : stats_mgr_.on_delivery_failure(pending_ack->node_id);
192 : 1 : return;
193 : : }
194 : :
195 : : // Calculate RTT: current packet timestamp (ms) - pending packet timestamp (ms)
196 : 2 : uint32_t rtt_ms = static_cast<uint32_t>(decoded.raw.timestamp_ms - pending_ack->timestamp_ms);
197 : 2 : stats_mgr_.on_ack_received(pending_ack->node_id, rtt_ms);
198 : :
199 : 2 : notify_logical_ack();
200 : : }
201 : :
202 : 24 : void TxManager::tx_task_func(void* arg)
203 : : {
204 : 24 : static_cast<TxManager*>(arg)->tx_task();
205 : :
206 : : // Should never reach here — tx_task() self-deletes via freertos_hal_
207 : 0 : vTaskDelete(NULL);
208 : 0 : }
209 : :
210 : 24 : void TxManager::tx_task()
211 : : {
212 : 24 : DecodedTxPacket structured_packet;
213 : 24 : TxPacket raw_packet;
214 : 24 : bool should_stop = false;
215 : 24 : uint32_t notifications = 0;
216 : :
217 : 24 : ESP_LOGI(TAG, "TX Manager task started.");
218 : :
219 [ + + ]: 96 : while (!should_stop) {
220 : 72 : TxState current_state = fsm_.get_state();
221 : :
222 : : // 1. Process pending notifications without blocking.
223 : : // This ensures they are handled even if the queue is full and we are IDLE,
224 : : // or if we are returning from a non-blocking state like RETRYING or SCANNING.
225 [ - + ]: 72 : if (freertos_hal_.task_notify_wait(0, NOTIFY_ALL, ¬ifications, 0) == pdTRUE) {
226 : 0 : handle_notifications(notifications, should_stop);
227 [ # # ]: 0 : if (should_stop) {
228 : : break;
229 : : }
230 : : // Refresh state in case notifications triggered a transition
231 : 0 : current_state = fsm_.get_state();
232 : : }
233 : :
234 [ + + + + ]: 72 : switch (current_state) {
235 : 53 : case TxState::IDLE:
236 : 53 : {
237 : : // Non-blocking queue read: the queue alone does not control sleep.
238 [ + + ]: 53 : if (freertos_hal_.queue_receive(tx_queue_, &structured_packet, 0) == pdTRUE) {
239 : : // Update sequence number only for non-ACK packets.
240 : : // ACKs must preserve the sequence number of the packet they are acknowledging.
241 [ + - ]: 11 : if (structured_packet.header.msg_type != MessageType::ACK) {
242 : 11 : structured_packet.header.sequence_number = sequence_counter_++;
243 : : }
244 : :
245 : : // Encode the packet into raw wire format
246 : 11 : raw_packet.len = codec_.encode(
247 : : structured_packet.header,
248 : : structured_packet.payload,
249 : : structured_packet.payload_len,
250 : : raw_packet.data,
251 : : sizeof(raw_packet.data));
252 : :
253 [ + - ]: 11 : if (raw_packet.len == 0) {
254 : : ESP_LOGE(TAG, "Failed to encode packet");
255 : : break;
256 : : }
257 : :
258 : 11 : memcpy(raw_packet.dest_mac, structured_packet.dest_mac, 6);
259 : 11 : raw_packet.requires_ack = structured_packet.header.requires_ack;
260 : :
261 : 11 : esp_err_t send_result =
262 : 11 : hal_espnow_.hal_esp_now_send(raw_packet.dest_mac, raw_packet.data, raw_packet.len);
263 : :
264 [ + + ]: 11 : if (send_result == ESP_OK) {
265 : 7 : TxState next = fsm_.on_packet_sent(raw_packet.requires_ack);
266 [ + + ]: 7 : if (next == TxState::WAITING_FOR_ACK) {
267 : 4 : PendingAck pending = {
268 : 4 : .sequence_number = structured_packet.header.sequence_number,
269 : 4 : .timestamp_ms = structured_packet.header.timestamp_ms,
270 : : .retries_left = 3,
271 : : .packet = raw_packet,
272 : 4 : .node_id = structured_packet.header.dest_node_id};
273 : 4 : fsm_.set_pending_ack(pending);
274 : 4 : freertos_hal_.timer_start(ack_timeout_timer_, pdMS_TO_TICKS(10));
275 : : }
276 : : }
277 : : else {
278 : 4 : handle_esp_now_send_errors(send_result);
279 : : }
280 : : }
281 : : else {
282 : : // Queue is empty. The task blocks here waiting for any notification.
283 : : // When queue_packet() is called, it sends NOTIFY_DATA to wake up this wait.
284 [ + - ]: 42 : if (freertos_hal_.task_notify_wait(0, 0xFFFFFFFF, ¬ifications, portMAX_DELAY) == pdTRUE) {
285 : 42 : handle_notifications(notifications, should_stop);
286 : : }
287 : : }
288 : : break;
289 : : }
290 : :
291 : 8 : case TxState::WAITING_FOR_ACK:
292 : 8 : {
293 : : // The task MUST NOT consume new packets while waiting for a network ACK from the peer.
294 : : // It blocks here completely until a notification arrives (like NOTIFY_LOGICAL_ACK, NOTIFY_ACK_TIMEOUT, or
295 : : // NOTIFY_DELIVERY_FAILURE).
296 [ + - ]: 8 : if (freertos_hal_.task_notify_wait(0, 0xFFFFFFFF, ¬ifications, portMAX_DELAY) == pdTRUE) {
297 : : // If a NOTIFY_LOGICAL_ACK arrives, handle_notifications() warns the TxStateMachine and stops the timer.
298 : : // If a NOTIFY_ACK_TIMEOUT arrives, the FSM transitions to RETRYING.
299 : 8 : handle_notifications(notifications, should_stop);
300 : 8 : if (should_stop) {
301 : : break;
302 : : }
303 : : }
304 : : break;
305 : : }
306 : :
307 : 10 : case TxState::RETRYING:
308 : 10 : {
309 : : // Immediate packet resend logic. Does not block awaiting notifications.
310 : : // Pending notifications are safely handled at the top of the next while loop iteration.
311 : 10 : auto pending_opt = fsm_.get_pending_ack();
312 [ + - + + ]: 10 : if (pending_opt && pending_opt->retries_left > 0) {
313 : 8 : PendingAck pending = pending_opt.value();
314 : 8 : pending.retries_left--;
315 : 8 : fsm_.set_pending_ack(pending);
316 : :
317 : 8 : esp_err_t send_result =
318 : 8 : hal_espnow_.hal_esp_now_send(pending.packet.dest_mac, pending.packet.data, pending.packet.len);
319 : :
320 [ + + ]: 8 : if (send_result == ESP_OK) {
321 : : // Retry sent successfully, go back to WAITING_FOR_ACK and wait for the response again.
322 : 3 : freertos_hal_.timer_start(ack_timeout_timer_, pdMS_TO_TICKS(10));
323 : 3 : fsm_.on_packet_sent(true); // Back to WAITING_FOR_ACK
324 : 3 : stats_mgr_.on_retry(pending.node_id);
325 : : }
326 : : else {
327 : 5 : handle_esp_now_send_errors(send_result);
328 : : }
329 : : }
330 : : else {
331 : : // Retries exhausted, notify FSM to drop the packet and potentially sever the link state.
332 : 2 : auto pending_opt = fsm_.get_pending_ack();
333 [ + - ]: 2 : if (pending_opt) {
334 : 2 : stats_mgr_.on_packet_lost(pending_opt->node_id);
335 : : }
336 : 2 : fsm_.on_max_retries();
337 : : }
338 : 10 : break;
339 : : }
340 : :
341 : 1 : default:
342 : 1 : ESP_LOGE(TAG, "Unknown TxState: %d", static_cast<int>(current_state));
343 : 1 : fsm_.reset();
344 : 1 : break;
345 : : }
346 : : }
347 : :
348 : 24 : ESP_LOGI(TAG, "TX Manager task exiting.");
349 : 24 : tx_task_handle_ = nullptr;
350 : 24 : freertos_hal_.semaphore_give(task_done_semaphore_);
351 : 24 : freertos_hal_.task_suspend(nullptr); // NULL / nullptr == current task
352 : 0 : freertos_hal_.task_delete(nullptr); // NULL / nullptr == current task
353 : 0 : }
354 : :
355 : : // =====================================================================================
356 : : // Private methods
357 : : // =====================================================================================
358 : :
359 : 50 : void TxManager::handle_notifications(uint32_t notifications, bool& should_stop)
360 : : {
361 : : // Multiple notification bits can arrive simultaneously and must all be
362 : : // processed. else-if would silently drop bits after the first match.
363 [ + + ]: 50 : if ((notifications & NOTIFY_LINK_ALIVE) == NOTIFY_LINK_ALIVE) {
364 : 1 : fsm_.on_link_alive();
365 : : }
366 : :
367 [ + + ]: 50 : if ((notifications & NOTIFY_DELIVERY_FAILURE) == NOTIFY_DELIVERY_FAILURE) {
368 : 9 : DeliveryEvent event{};
369 [ + + ]: 18 : while (freertos_hal_.queue_receive(delivery_queue_, &event, 0) == pdTRUE) {
370 : 9 : NodeId node_id = 0;
371 : 9 : esp_err_t err = peer_mgr_.find_node_id_by_mac(event.dest_mac, node_id);
372 [ + + ]: 9 : if (err == ESP_ERR_TIMEOUT) {
373 : 1 : ESP_LOGW(TAG, "Delivery event: mutex timeout resolving MAC");
374 : 2 : continue;
375 : 1 : }
376 [ + + ]: 8 : if (err != ESP_OK) {
377 : 1 : ESP_LOGW(TAG, "Delivery event for unregistered MAC — unexpected after protocol fix");
378 : 1 : continue;
379 : 1 : }
380 : 7 : stats_mgr_.on_delivery_failure(node_id);
381 : : }
382 : : // FSM check if MAX_FAILURES was reached and observer should be notified
383 : 9 : bool max_failures = fsm_.on_delivery_failure();
384 [ + + ]: 9 : if (max_failures) {
385 : 1 : ESP_LOGW(TAG, "Max failures reached, notifying RX task");
386 : 1 : freertos_hal_.task_notify(rx_task_handle_, NOTIFY_MAX_FAILURES, eSetBits);
387 : : }
388 : : }
389 [ + + ]: 50 : if ((notifications & NOTIFY_DELIVERY_SUCCESS) == NOTIFY_DELIVERY_SUCCESS) {
390 : 3 : DeliveryEvent event{};
391 [ + + ]: 6 : while (freertos_hal_.queue_receive(delivery_queue_, &event, 0) == pdTRUE) {
392 : 3 : NodeId node_id = 0;
393 : 3 : esp_err_t err = peer_mgr_.find_node_id_by_mac(event.dest_mac, node_id);
394 [ + + ]: 3 : if (err == ESP_ERR_TIMEOUT) {
395 : 1 : ESP_LOGW(TAG, "Delivery event: mutex timeout resolving MAC");
396 : 2 : continue;
397 : 1 : }
398 [ + + ]: 2 : if (err != ESP_OK) {
399 : 1 : ESP_LOGW(TAG, "Delivery event for unregistered MAC — unexpected after protocol fix");
400 : 1 : continue;
401 : 1 : }
402 : 1 : stats_mgr_.on_delivery_success(node_id);
403 : : }
404 : 3 : fsm_.on_delivery_success();
405 : : }
406 [ + + ]: 50 : if ((notifications & NOTIFY_LOGICAL_ACK) == NOTIFY_LOGICAL_ACK) {
407 : 1 : fsm_.on_ack_received();
408 : 1 : freertos_hal_.timer_stop(ack_timeout_timer_, pdMS_TO_TICKS(10));
409 : : }
410 [ + + ]: 50 : if ((notifications & NOTIFY_ACK_TIMEOUT) == NOTIFY_ACK_TIMEOUT) {
411 : 1 : fsm_.on_ack_timeout();
412 : : }
413 [ + + ]: 50 : if ((notifications & NOTIFY_TASK_TO_STOP) == NOTIFY_TASK_TO_STOP) {
414 : 24 : should_stop = true;
415 : : }
416 : 50 : }
417 : :
418 : 9 : void TxManager::handle_esp_now_send_errors(esp_err_t error)
419 : : {
420 : 9 : auto pending_opt = fsm_.get_pending_ack();
421 [ + + ]: 9 : NodeId node_id = pending_opt ? pending_opt->node_id : 0;
422 : :
423 [ + + ]: 9 : if (error == ESP_ERR_ESPNOW_NO_MEM) {
424 : : // Transient: do not penalize the FSM, ACK timeout will handle retry
425 : : ESP_LOGW(TAG, "hal_esp_now_send: out of memory, will retry via timeout");
426 : : }
427 [ + + ]: 8 : else if (error == ESP_ERR_ESPNOW_NOT_INIT || error == ESP_ERR_ESPNOW_ARG) {
428 : : // Programming errors: log and discard
429 : : ESP_LOGE(TAG, "hal_esp_now_send: unrecoverable error %s", esp_err_to_name(error));
430 : : }
431 : : else {
432 : : // ESP_ERR_ESPNOW_NOT_FOUND, CHAN, IF, INTERNAL — link-level failures
433 : 6 : stats_mgr_.on_driver_error(node_id);
434 : 6 : ESP_LOGW(TAG, "hal_esp_now_send failed: %s", esp_err_to_name(error));
435 : 6 : fsm_.on_delivery_failure();
436 : : }
437 : 9 : }
438 : :
439 : 2 : void TxManager::notify_logical_ack()
440 : : {
441 [ + - ]: 2 : if (tx_task_handle_ != nullptr) {
442 : 2 : freertos_hal_.task_notify(tx_task_handle_, NOTIFY_LOGICAL_ACK, eSetBits);
443 : : }
444 : 2 : }
|