🛰️Zafer Satılmış - Aviora

MQTT Connection Service

On the gateway, this layer manages MQTT broker session setup, subscriptions, handling of incoming publishes, and reconnection after loss, all through a single background task. A thin CycloneTCP-based connection manager sits underneath; this service defines higher-level behavior with an RTOS task loop, request flags, and callbacks.

CycloneTCP MQTT Dedicated Task Non-Blocking API Incoming Publish + Link Event

Separation Of Roles And Responsibilities

The connection manager is only responsible for a single broker session, CONNECT / SUBSCRIBE / PUBLISH, and the process path (RX + keep-alive) that must run every loop iteration; broker nickname or subscription table state is not kept there.

This service holds broker address, credentials, the topic list to subscribe to (limited slots), the incoming-message callback and optional link callback; all heavy work runs in a separate task on a fixed-period loop.

Layers (Mermaid)
flowchart TB
  subgraph App["Application / protocol module"]
    APP[RigelMq or other consumer]
  end
  subgraph Svc["MQTT Connection Service"]
    T[Task loop]
    B[Task flags]
    T --> B
  end
  subgraph Mng["MQTT connection manager"]
    C[CONNECT / DISCONNECT]
    P[process — RX and keep-alive]
    S[SUBSCRIBE / PUBLISH]
  end
  subgraph Net["Network stack"]
    CY[CycloneTCP mqtt_client]
  end
  APP -->|request: connect / publish / subscribe| Svc
  Svc --> Mng
  Mng --> CY
  CY -->|PUBLISH payload| Svc
  Svc -->|IncomingCb topic + data| APP
  Svc -->|LinkCb isUp| APP

Task Loop And Persistent Connection

The task runs in an infinite while until stop is requested. Each iteration notifies the watchdog; then connect, disconnect, reconnect, or batched subscribe steps are evaluated in order from the flags. While connected, process is invoked every iteration to keep receiving from the broker and drive protocol keep-alive. If process fails, the session is treated as lost and reconnect is scheduled after a short delay.

The loop period is fixed (e.g. one hundred milliseconds). The same cadence is used whether connected or not, which yields a predictable timeline for regular process runs and retries after a drop.
Outgoing Message (PUBLISH) — Call Chain

When the application calls appMqttConnServicePublish, execution stays in MQTT Connection Service first, then passes to the connection manager and the CycloneTCP mqtt_client API. The animation below highlights each of the three functions in turn as it becomes “active” (arrows show the handoff from one stage to the next).

On the CycloneTCP side, the send goes through the connected MqttClientContext via mqttClientPublish(…, topic, payload, len, MQTT_QOS_LEVEL_0, …) (mapped to APP_MQTT_CONN_MNG_QOS in the manager). If the return value is NO_ERROR, APP_MQTT_STAT_PUB_OK is reported; otherwise APP_MQTT_STAT_PUB_FAIL.

Task Round — Simplified Flow
flowchart TD
  A[Loop start] --> B{stop?}
  B -->|yes| Z[Cleanup, delete task]
  B -->|no| C[imOK]
  C --> D{want online?}
  D -->|no| E{connected?}
  E -->|yes| F[disconnect]
  E -->|no| Y[sleep]
  D -->|yes| G[connect / disconnect / reconnect / subscribe steps]
  G --> H{connected and process?}
  H -->|yes| I{process OK?}
  I -->|no| J[schedule reconnect]
  I -->|yes| Y
  H -->|no| Y
  J --> Y
  F --> Y
Schematic animation of the MQTT service task loop and process steps
Poll Loop: One process call per round plus periodic sleep so RX and keep-alive keep running.
Animation of reconnect and re-subscription after a drop
Reconnect: After a loss or subscribe failure, disconnect/connect and topic subscriptions are set up again.

Incoming Packet Delivery — IncomingCb

When the broker publishes to a subscribed topic, the connection manager’s listen callback runs. The service forwards that to the application’s incoming data callback.

  • First argument: topic name — the MQTT equivalent of a TCP “channel”; used to tell which topic the payload came from.
  • Second argument: payload — byte buffer; it is not required to be null-terminated.
  • Third argument: length — size in bytes.

Nothing is delivered if the callback is null or the topic/data pointer is invalid. The application is thus fed through one contract: topic plus raw bytes.

Incoming PUBLISH Path
sequenceDiagram
  participant BR as Broker
  participant CY as mqtt_client
  participant M as Connection manager
  participant S as Service task
  participant CB as App IncomingCb
  BR->>CY: PUBLISH topic payload
  CY->>M: RX handling
  M->>M: Listen callback
  M->>S: topic, buffer, length
  S->>CB: topic, data, dataLength

Connection And Session Events — LinkCb

This callback is optional. If omitted, only incoming publishes reach the application; if provided, a single integer tells whether the session is “really up” and when it has gone “down.”

isUp != 0 (Connected / Ready)

Invoked once when TCP and MQTT session are up, all planned topic subscribe attempts have succeeded, and an “up” notification has not been sent before. That lets application logic run only when traffic is actually expected (e.g. before the first PUBLISH).

isUp == 0 (Down / Disconnected)

After connect failure, loss, or user-requested disconnect, if an “up” had been reported, an “down” is reported. An internal flag avoids duplicate notifications.

Connection State — Summary
stateDiagram-v2
  [*] --> Closed: Start
  Closed --> Connecting: RequestConnect
  Connecting --> AwaitingSub: CONNECTION_OK
  Connecting --> Down: CONNECTION_FAIL
  AwaitingSub --> Ready: All SUB OK + LinkCb up
  AwaitingSub --> Reconnecting: SUBS_FAIL
  Ready --> Down: process error or Disconnect
  Down --> Connecting: Reconnect / Connect flag
  Ready --> Closed: Stop

Task Flags And Connection Status Callback

Requests from the rest of the system do not block the network; bits are set and the task handles them on the next round. Status codes from the connection manager also feed the same flags.

Flag / ConditionMeaning
taskConnectSchedule a CONNECT attempt.
taskDisconnectClean disconnect; may clear userWantOnline.
taskReconnectDisconnect first, then set taskConnect again.
taskSubscribeAfter connect succeeds, run SUB for all non-empty topic slots.
userWantOnlineDesire to stay online; if cleared, connection is torn down even if up.
linkUpNotifiedWhether LinkCb “up” was already delivered once.
stopRequestedExit path: disconnect, clear state, delete task.

CONNECTION_OK schedules the subscribe pass. CONNECTION_FAIL emits “down” if “up” was previously reported, then requests CONNECT again. SUBS_FAIL triggers reconnect and re-subscribe.

Topic Slots And Subscription

Topic strings live in a fixed-size table; an empty slot is a row whose first byte is zero. The first topic is usually copied at service start; additional topics use the next free slot, and a duplicate topic is a successful no-op. If a new topic is added while connected, SUBSCRIBE is attempted immediately; on failure the slot is rolled back. A short delay between multiple subscribe commands reduces stack or broker contention.

Client ID And Startup Order

The MQTT clientId is built automatically from a fixed prefix and the configured broker short name. Starting the service creates the task but does not connect to the broker; connection is requested separately. That allows credentials and the first topic to be ready before the loop connects safely.

Stop is non-blocking: the call only sets the stop flag; the task disconnects, clears slots and strings, nulls callback pointers, and deletes its own task id. Before starting again, ensure the task has actually finished (short delay or status poll).

General API Summary

FunctionShort Description
StartCreates the task with broker address, port, optional user/password, first topic, required IncomingCb, optional LinkCb.
StopNon-blocking stop request.
IsConnectedCurrent session state (from manager).
RequestConnectQueue stay-online and CONNECT.
RequestDisconnectQueue disconnect.
PublishQoS 0 publish; requires connection.
SubscribeAdd topic to a free slot; SUB immediately if connected.