Getting Started with IPWorks IoT MQTT
Requirements: IPWorks IoT
Contents
Introduction to IPWorks IoT MQTT
IPWorks IoT is a suite of components aimed at providing lightweight yet fully-featured implementations of technologies targeted at the Internet of Things. This article will focus specifically on the MQTT component.
MQTT is a popular lightweight message queue protocol which runs over any reliable transport protocol and provides quality of service guarantees. The MQTT component implements an easy-to-use client that supports both MQTT 3.1.1 and 5.
This article will discuss MQTT connections, subscribing to topics, and publishing messages. It will also cover major changes in the MQTT 5 protocol. For more details and information on topics not covered in this article please refer to the product documentation.
Connecting
Connecting to an MQTT server is easy; in the simplest case, just set the ClientId property and call the Connect method, passing it the server's hostname and port number.
If your server requires authentication, set the User and (optionally) Password properties. The KeepAliveInterval property can be set to a non-zero value if you wish to have the component automatically send keep-alive (MQTT PING) packets to the server. For instance:
mqtt1.ClientId = "testClient";
mqtt1.User = "test";
mqtt1.Password = "password";
mqtt1.KeepAliveInterval = 30;
mqtt1.Connect("test.mosquitto.org", 1883);
To connect to a server using SSL, set SSLEnabled to True.
WebSocket connections are also supported. To connect using WebSockets, simply specify a hostname starting with ws:// (plaintext) or wss:// (SSL). For example, ws://test.mosquitto.org.
Sessions
Over the lifetime of an MQTT connection, both the client and server keep track of certain details about the session. Both sides keep track of messages which have not been fully acknowledged, and the server also keeps track of the client's subscriptions. What happens to this data when the connection is closed depends on what the CleanSession property was set to when the connection was initially created.
By default, the CleanSession property is set to true. As a result, the server will not save any session data when the connection closes (and neither should you). In addition, connecting with CleanSession set to true causes the server to discard any session data that it might have previously saved if you connected with CleanSession set to false last time.
If the CleanSession property is set to false, then all of the session state data is saved when the connection is closed. On the MQTT component's end, you'll need to call the SaveSession method in order to save the current client-side session data. You'll also want to save the ClientId value, since the server-side session data is associated with it.
// Assume we connected with CleanSession = false.
mqtt1.Disconnect();
string clientId = mqtt1.ClientId;
string sessionState = mqtt1.SaveSession();
When reconnecting at a later point in time, set the ClientId property back to the previous value and use the RestoreSession method to restore the client-side data (which will also set CleanSession to false automatically).
mqtt1.ClientId = clientId;
mqtt1.User = "test";
mqtt1.Password = "password";
mqtt1.RestoreSession(sessionState); // Automatically sets CleanSession = false.
mqtt1.Connect("test.mosquitto.org", 1883);
Wills
The Will feature of MQTT allows clients to specify to the server a message to publish (as well as a topic to publish it to) in the event of an ungraceful disconnection. An "ungraceful disconnection" is any disconnection other than one triggered by calling Disconnect (in which case the server discards the Will message without publishing it).
To supply a Will, set the WillTopic and WillMessage properties before connecting (they cannot be changed once connected). In addition, you can use the WillQOS configuration setting to specify the Will message's QoS level, and the WillRetain configuration setting to set the Will message's Retain flag.
Note that if WillTopic is set to empty string (default) when connecting, the component will not send a Will to the server.
mqtt1.ClientId = "testClient";
mqtt1.User = "test";
mqtt1.Password = "password";
mqtt1.WillTopic = "wills/" + mqtt1.ClientId;
mqtt1.WillMessage = mqtt1.ClientId + " was disconnected ungracefully!";
mqtt1.Connect("test.mosquitto.org", 1883);
Subscribing to Topics
Subscribing
The Subscribe method is used to subscribe the component to one or more topic filters using one or more QoS levels. The Subscribed event will fire once for each topic filter specified once the server acknowledges the subscription requests.
QoS values set the service level for delivery of a message. Values range from 0 to 2 and have the following meanings:
QoS Level | Description |
0 | At most once - The published message is sent once, and if it does not arrive it is lost. |
1 | At least once - Guarantees that the published message arrives, but there may be duplicates. |
2 | Exactly once - Guarantees that the publish message arrives and that there are no duplicates. |
Refer to Inbound Message Processing for more information.
It is perfectly legal to call Subscribe again for the same topic filter(s) at any time, and you may pass a different QoS value at that time if you wish to do so.
// Subscribed event handler.
mqtt1.OnSubscribed += (s, e) => {
if (e.ResponseCode <= 2)
Console.WriteLine("Subscribed to " + e.TopicFilter + " at QoS " + e.QOS + ".");
else
Console.WriteLine("Failed to subscribe to " + e.TopicFilter + ".");
};
// Basic, subscribe to some topic filters, all at the same QoS level.
mqtt1.Subscribe("home,home/floor1/+/temperature,home/floor2/#", 2);
// A bit more advanced, subscribe to the same topic filters, but at different QoS levels.
mqtt1.Config("TopicQOSArray=1,2,2");
// The 0 is ignored here since we've specified individual QoS values explicitly.
mqtt1.Subscribe("home,home/floor1/+/temperature,home/floor2/#", 0);
Keep in mind that the server is allowed to start publishing messages before it sends the subscription acknowledgement.
Receiving Messages
Once subscribed to a topic, any messages published by the server to the component will be fired through the MessageIn event (once they have been fully acknowledged, see Inbound Message Processing for more information).
// MessageIn event handler.
mqtt1.OnMessageIn += (s, e) => {
Console.WriteLine("Received message from topic '" + e.Topic + "' with QoS " + e.QOS + ":");
Console.WriteLine(e.Message);
};
Unsubscribing
The Unsubscribe method is used to unsubscribe the component from one or more topic filters. The Unsubscribed event will fire once when the server has acknowledged the unsubscribe request.
However, note that the server's acknowledgement does not specify which (if any) topic filters it actually unsubscribed the client from; it is up to you to ensure that you are passing the exact same topic filter strings that were used when subscribing. (The Unsubscribed event's TopicFilters parameter is present for convenience, it always holds a copy of the string passed to the Unsubscribe method.)
// Unsubscribe from topic filters; have to use the exact same strings as before. If this
// was to be called after calling the code example shown for the Subscribe() method, we
// would still be subscribed to the "home" topic filter.
mqtt1.Unsubscribe("home/floor1/+/temperature,home/floor2/#");
It is impossible to partially unsubscribe from a topic filter with wildcards (that is, if a client is subscribed to a topic filter home/floor1/+/#, requesting to unsubscribe from a topic filter home/floor1/livingRoom/temperature does nothing).
Similarly, because topic filters in an unsubscribe request are simply compared character-by-character with existing subscriptions rather than being interpreted, you cannot do something like "unsubscribe from all currently subscribed topics" by passing "#" to Unsubscribe.
Topic Filters
The string passed for TopicFilter must contain one or more valid topic filter strings, separated by the delimiter string specified by the TopicDelimiter configuration setting (, by default).
A topic filter is a case-sensitive string between 1 and 65535 characters long (per topic filter), and can include any character other than the null character. Certain characters have special meanings:
- / - The topic level separator
- # - The multi-level wildcard (zero or more levels)
- + - The single-level wildcard (exactly one level)
- Leading $ - Denotes a "system topic"
Note that you are free to use both types of wildcards in the same topic filter, as long as they are used correctly.
Topic Level Separators
The topic level separator, as its name implies, is used to separate a topic name (or in this case, filter) into "levels". This concept of topic names having levels is what allows topic filters to match multiple topics through the use of wildcards. For the examples in the next sections, assume the following topics exist:
- home/floor1
- home/floor1/livingRoom
- home/floor1/livingRoom/temperature
- home/floor1/kitchen/temperature
- home/floor1/kitchen/fridge/temperature
- home/floor2/bedroom1
- home/floor2/bedroom1/temperature
Multi-level Wildcards
The multi-level wildcard character is used at the end of a topic filter to make it match an arbitrary number of successive levels. For example, the topic filter home/floor1/# would match the following topics:
- home/floor1 (because it can match zero levels)
- home/floor1/livingRoom
- home/floor1/livingRoom/temperature
- home/floor1/kitchen/temperature
- home/floor1/kitchen/fridge/temperature
Here are some things to keep in mind when using a multi-level wildcard:
- # must always be the last character in the topic filter (e.g., home/floor1/#/livingRoom is not valid)
- # must always be preceded by a / (e.g., home/floor1# is not valid)
- # by itself is a valid topic filter, and will match all topics except system topics
Single-level Wildcards
The single-level wildcard character is used between two /s in a topic filter to make it any single level. For example, the topic filter home/floor1/+/temperature would match the following topics:
- home/floor1/livingRoom/temperature
- home/floor1/kitchen/temperature
You can use as many single-level wildcards as you want to in a topic filter. For example, the topic filter home/+/+/temperature would match the following topics:
- home/floor1/livingRoom/temperature
- home/floor1/kitchen/temperature
- home/floor2/bedroom1/temperature
Here are some things to keep in mind when using single-level wildcards:
- + must always be separated from other levels using /s (e.g., home/floor1+ is invalid, but +/floor1/+ is valid)
- + by itself is a valid topic filter, and will match all topics with exactly one level in their name except system topics
- Remember, topic names with a leading / have a zero-length string as their first level. So a topic named /people would be matched by the topic filter +/+, but not by +
- + must match exactly one level. So for example, the topic filter home/floor1/kitchen/+/temperature would match /home/floor1/kitchen/fridge/temperature, but not home/floor1/kitchen/temperature
System Topics
Topic names which begin with a $ are "system topics". Typically, the server prohibits clients from publishing to such topics, but permits subscribing to them. As described above, wildcards will never match the first level of a topic if it begins with a $.
Inbound Message Processing
- Incoming messages with a QoS of 0 are fired through the MessageIn event immediately.
-
Incoming messages with a QoS of 1 follow these steps:
- The message is added to the IncomingMessages collection property when the component receives the PUBLISH packet.
- The component sends a PUBACK (publish acknowledgement) packet in response.
- The MessageAck event is fired.
- The message is removed from the IncomingMessages collection property.
- The MessageIn event is fired.
-
Incoming messages with a QoS of 2 follow these steps:
- The message is added to the IncomingMessages collection property when the component receives the PUBLISH packet.
- The component sends a PUBREC (publish received) packet in response.
- The component waits to receive a PUBREL (publish release) packet.
- The component sends a PUBCOMP (publish complete) packet in response.
- The MessageAck event is fired.
- The message is removed from the IncomingMessages collection property.
- The MessageIn event is fired.
Publishing Messages
Publishing messages can be accomplished by calling either PublishMessage (string payload) or PublishData (raw data payload). Both methods, in addition to the payload, accept a topic name and QoS value.
QoS values set the service level for delivery of a message. Values range from 0 to 2 and have the following meanings:
QoS Level | Description |
0 | At most once - The published message is sent once, and if it does not arrive it is lost. |
1 | At least once - Guarantees that the published message arrives, but there may be duplicates. |
2 | Exactly once - Guarantees that the publish message arrives and that there are no duplicates. |
Refer to Outbound Message Processing for more information.
// Publish a simple string-based message.
mqtt1.PublishMessage("/home/floor1/security/camera2", 1, "Cat detected!");
// Publish a raw data message.
byte[] catPicture = ...;
mqtt1.PublishData("/home/floor1/security/camera2", 1, catPicture);
Publishing a message with the Retain flag set will cause it to be retained by the server, which means the server will automatically deliver that message to any new subscribers to the topic.
Topic Names
Topic names are case-sensitive, must be 1-65535 characters long, and may include any characters except wildcard characters (# and +) and the null character. The / character separates levels within a topic name, which is important in the context of subscribing (see the Topic Filters section for more information).
Keep in mind that using topic names with leading or trailing / characters will cause topic levels with zero-length names to be created. That is, a topic name like /a/b/ consists of the levels '', 'a', 'b', and ''. Depending on your server, multiple successive /s may also cause zero-length levels to be created, or may be treated as a single /.
Topic names that begin with a $ are "system topics", and servers will typically prevent clients from publishing to them.
Outbound Message Processing
- Outgoing messages with a QoS of 0 are published immediately.
-
Outgoing messages with a QoS of 1 follow these steps:
- The component sends the PUBLISH packet, then adds the message to the OutgoingMessages collection property.
- The component waits to receive a PUBACK (publish acknowledgement) packet.
- The MessageAck event is fired.
- The message is removed from the OutgoingMessages collection property.
-
Outgoing messages with a QoS of 2 follow these steps:
- The component sends the PUBLISH packet, then adds the message to the OutgoingMessages collection property.
- The component waits to receive a PUBREC (publish received) packet.
- The component sends a PUBREL (publish release) packet in response.
- The component waits to receive a PUBCOMP (publish complete) packet.
- The MessageAck event is fired.
- The message is removed from the OutgoingMessages collection property.
The RepublishInterval configuration setting, if set to a non-zero value (default), controls how long the component will wait to receive a PUBACK (for QoS 1) or PUBREC (for QoS 2) before automatically republishing an outgoing message.
MQTT 5
MQTT 5 is largely the same as MQTT 3.1.1, but with a few changes and a number of notable new features. This section will cover the major differences in the protocol versions, and explain how to utilize the new features with the component. Refer to the product documentaion for more details and changes.
Connecting and Sessions
The largest fundamental difference betweeen the protocol versions is the handling of stored session data. In MQTT 5, the Clean Session flag functionality is split into a Clean Start flag and a Session Expiry Interval flag. Other than this change, connecting and disconnecting is no different than in MQTT 3.1.1.
Clean Start (Restoring Sessions)
The Clean Start flag indicates whether or not session data previously saved by a client should be discarded by the server at the time that client connects. This behavior is controlled by the CleanSession property in the component. Just as in MQTT 3.1.1, to instruct the server to restore session data upon connecting, set the ClientId to the value from the previous session(s) and set CleanSession to false. See the code example below.
The client restores its own sessions in the same way as in MQTT 3.1.1 - calling RestoreSession when CleanSession is false.
mqtt1.ClientId = clientId;
mqtt1.RestoreSession(sessionState); // Automatically sets CleanSession = false.
mqtt1.Connect(host, port);
However, unlike in MQTT 3.1.1, in MQTT 5 CleanSession does not control if/when the server discards session data after disconnection. This behavior is controlled by SessionExpInterval.
Session Expiry Interval (Saving Sessions)
Setting SessionExpInterval before calling Connect determines how long the server will wait after disconnection before discarding session data. If set to 0, data will be discarded immediately after disconnection.
Note that setting CleanSession to true and setting SessionExpInterval to 0 in MQTT 5 is equivalent to setting CleanSession to true in MQTT 3.1.1.
If the property is set to a positive value, the server will discard data after that number of seconds has passed since disconnection. If set to 0xFFFFFFFF, the session does not expire.
Saving session state is also done by the same process as in MQTT 5, with the exception that if/when to save data is controlled by SessionExpInterval rather than CleanSession (just as with the server). The client should only save session data if it set a positive interval, and it should only restore data if the interval has not yet passed.
// Assume we connected with SessionExpInterval > 0.
mqtt1.Disconnect();
string clientId = mqtt1.ClientId;
string sessionState = mqtt1.SaveSession();
Shared Subscriptions
Shared subscriptions allow a client to subscribe to a topic as a part of a group of clients also subscribing to that same topic. Each message published to such a group will be delivered to only one client in the group, making them useful when several clients share the processing of the publications in parallel.
Subscribing to a topic as part of a shared subscription is similar to a typical subscription, the only difference being the syntax of the topic filter passed to the Subscribe method.
The format of this filter is $share/{ShareName}/{filter}, where
- $share is a string literal identifying the system topic reserved for shared subscriptions.
- {ShareName} is a string at least one character long that must not include /, + or #.
- {filter} is a full topic filter formatted just as in a non-shared subscription.
Unsubscribing is done by passing the same full shared subscription filter to theUnsubscribe method.
The shared subscription behaves the same way as a non-shared subscription, except that the $share/{ShareName} portion of the topic filter is ignored when filtering publication topics, and that the server will distribute delivery of messages amongst the group, with each message delivered to only one client per group per topic.
mqtt1.Subscribe("$share/myShareGroup/myTopicFilter/subTopic", 2);
Subscription Identifiers
MQTT 5 allows clients to specify a numeric subscription identifier which will be returned with messages delivered for that subscription.
To do so with the component, use the SubscriptionIdentifier configuration setting before calling Subscribe. To update the identifier, call Subscribe with a new SubscriptionIdentifier.
Note that a topic may have multiple identifiers associated with it (due to wildcards), and the same identifier may be associated with multiple topics.
mqtt1.Connected = true;
mqtt1.OnMessageAck += (o, e) => {
String ids = mqtt1.IncomingMessages[e.Index].SubscriptionIdentifiers; // ids = "123,321"
};
mqtt1.Config("SubscriptionIdentifier=123");
mqtt1.Subscribe("test_topic/subtopic", 1);
mqtt1.Config("SubscriptionIdentifier=321");
mqtt1.Subscribe("test_topic/+", 1);
mqtt2.Connected = true;
mqtt2.PublishMessage("test_topic/subtopic", 1, "hello world");
Publishing Messages Changes
The client can now set a Message Expiry Interval after which the server must delete the copy of the message for any subscribers to which it has not yet been able to start delivery. If the value is not set, the messages do not expire. This feature is useful to limit the time for which messages will stay available for offline devices.
To include this value in a PUBLISH packet, set the MessageExpInterval value in the OutgoingMessageProperties configuration setting before calling PublishMessage:
mqtt1.Config("OutgoingMessageProperties={ \"MessageExpInterval\":\"1234\" }");
Additionally, in MQTT 5, when messages are not fully acknowledged on a healthy connection, they are not republished. In MQTT 3.1.1 this behavior was controlled by the RepublishInterval configuration setting, but in MQTT 5 the component does not republish messages over a constant connection regardless of the config value.
Note that the component still automatically republishes messages when it is disconnected and then reconnects, as in MQTT 3.1.1.
Publishing with Topic Aliases
Topic aliases allow clients to associate a topic filter with an integer alias that can be used instead of the topic filter when publishing messages in the future, reducing packet sizes.
To establish a topic alias mapping, set the TopicAlias configuration setting to a unique value before calling PublishMessage with the desired topic filter. Then, next time the client publishes a message to this topic, it may set TopicAlias to the value established and call PublishMessage with an empty topic filter string. The message will be published to the proper topic without sending the topic filter.
A sender can modify the Topic Alias mapping by sending another PUBLISH during the same connection with the same Topic Alias value and a different non-zero length Topic Name.
Note that a topic alias must have a value greater than zero and less than or equal to the ServerTopicAliasMax value in the ConnAckProperties configuration setting. Topic alias mappings exist only within a connection and are not a part of stored session state data.
To reset the value once it has been previously set, so that it is no longer included in future packets, set it to -1.
mqtt1.Connected = true;
mqtt1.Config("TopicAlias=1"); // map 1 to topic "PublishWithTopicAlias"
mqtt1.PublishMessage(topic, 1, "hello");
mqtt1.Config("TopicAlias=1"); // set topic alias 1 to publish with empty topic filter
mqtt1.PublishMessage("", 1, "hello");
Request/Response Messages
MQTT 5 supports a formalized Request/Response pattern. A client can send a message with a ResponseTopic at which any responder clients receiving the message can send a response message to. The requester can also include a CorrelationData value which the responder will return in its response message, allowing the requester to identify what request message prompted the response.
Any normal message published with a ResponseTopic is considered a Request Message, and any message published on that ResponseTopic is considered a Response Message.
String requestTopic = "Test/RequestTopic";
String responseTopic = "Test/ResponseTopic";
String requestMessage = "Request";
String responseMessage = "Response";
String correlationDataHex = "61616161";
bool reqRespComplete = false;
responder.OnMessageAck += (o, e) => {
if (e.Direction == 1)
{ // receiving request message
String recvdResponseTopic = responder.IncomingMessages[e.Index].ResponseTopic;
String recvdCorrelationData = responder.IncomingMessages[e.Index].CorrelationData;
String responseProps = "{ \"CorrelationData\":\"" + recvdCorrelationData + "\" }";
responder.Config("OutgoingMessageProperties=" + responseProps);
responder.PublishMessage(recvdResponseTopic, 1, responseMessage);
}
};
responder.Connected = true;
responder.Subscribe(requestTopic, 1);
requester.Connected = true;
requester.Subscribe(responseTopic, 1);
requester.OnMessageAck += (o, e) => {
if (e.Direction == 1) // receiving response message
{
String recvdCorrelationData = requester.IncomingMessages[e.Index].CorrelationData;
Log("corr " + recvdCorrelationData);
if (recvdCorrelationData.Equals(correlationDataHex))
{
String recvdResponseMessage = requester.IncomingMessages[e.Index].Message;
// do something with responseMessage
reqRespComplete = true;
}
}
};
String requestProps = "{\"ResponseTopic\":\"" + responseTopic + "\"," +
"\"CorrelationData\":\""+correlationDataHex+"\" }";
requester.Config("OutgoingMessageProperties=" + requestProps);
requester.PublishMessage(requestTopic, 1, requestMessage);
while (!reqRespComplete) {
requester.DoEvents();
}
Reason Codes
In MQTT 5, all response packets (CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT) now contain a reason code describing why operations succeeded or failed. Most of these packets also now have an optional Reason String for further information.
These provide specific feedback on exactly what aspect of the protocol was violated, or exactly why the server refuesed or could not complete an operation. Examples include:
- Server communicating to client that a message sent was not delivered because no there were no subscribers on the topic.
- Server communicating to client that a subscription was granted with a lower quality of service than requested.
- Server communicating to client that an unsubscribe was not successful because the client was not subscribed to that topic fiter.
- Server communicating to client that an operation failed because it does not support a feature (such as wildcard subscriptions).
- Client communicating to server that although it is disconnecting gracefully (with a DISCONNECT packet), the server should still send its Will message.
These reason codes can be accessed using the ResponseCode arguments of the MessageAck, Subscribed and Unsubscribed events, as well as the DisconnectReasonCode configuration setting.
When the client throws an exception due to a protocol error with a non-zero reason code or reason string, it will be included in the message.
We appreciate your feedback. If you have any questions, comments, or suggestions about this article please contact our support team at support@nsoftware.com.