Getting Started with IPWorks IoT AMQP
Requirements:
IPWorks IoT
Contents
- Introduction
- Establishing a Connection
- Sending Messages
- Receiving Messages
- Message Structure
- Code Examples
Introduction to IPWorks IoT AMQP
IPWorks IoT is a suite of components aimed at providing lightweight yet fully-featured implementations of protocols used by the Internet of Things. This article focuses specifically on the AMQP component.
AMQP is a messaging system for communicating between two different containers with a variety of nodes. It defines both the protocol for sending messages between nodes and the structure of the messages themselves. The AMQP component provides an easy-to-use AMQP 1.0 client implementation. The component supports both plaintext and TLS-enabled connections over TCP.
This article will discuss the different layers of AMQP, the structure of an AMQP message, and sending and receiving messages. Please refer to the product documentation for more information.
Establishing a Connection
An AMQP network is made up of various nodes organized within containers. The protocol consists of three layers: Connections, sessions, and links. Connections between containers are divided into sessions, which are collections of unidirectional links between nodes in each container.
Connections
All the communication between two containers takes place within a connection. The AMQP component manages a single container, so to create a connection you will first need to set the ContainerId property, which will be used to identify the container represented by the component. Then call the Connect method, passing it the hostname and port number of the server you wish to connect to.
amqp.ContainerId = "UniqueContainerId";
amqp.Connect(SERVER, PORT);
To use TLS/SSL, set the SSLEnabled property to true. Once SSL is enabled you will also have to set rules to accept the certificate from the server you plan to use.
If the server's certificate is not trusted by default on the system it may be trusted explicitly in code. One way to do this is to set the SSLAcceptServerCert property to that certificate. This will cause the component to automatically accept that certificate. For more control you can add a handler for the SSLServerAuthentication event. This event allows you to examine the certificate manually and decide if you want to accept it by setting the Accept parameter to true. For example:
amqp.SSLEnabled = true;
amqp.OnSSLServerAuthentication += (s, e) =>
{
// Verify certificate here, then:
e.Accept = true;
};
Sessions
Communication within a connection is divided into sessions, which group the links between nodes. To create a session, use the CreateSession method and pass in a name to identify the session.
amqp.CreateSession("SessionId");
Links
Messages are sent and received between nodes using unidirectional links, with a sender on one end and a receiver on the other. The component can open both sender links and receiver links, but as a client, the component can only create links and not accept link requests from peers.
To create a sender link, call the CreateSenderLink method, passing it the name of an existing session, a unique name for the link, and the name of the target node for the receiving end of the link. The LinkReadyToSend event will fire when the newly created sender link is ready to send messages.
amqp.CreateSenderLink("SessionId", "SenderLinkName", "SomeTarget");
AMQP uses a credit system to control how much data the sender can send at one time. When creating a receiver link the component implements this in the form of two modes: automatic, which functions as a push-based model where data is sent across the link as soon as it is produced; and fetch-based, which functions as a pull-based model by giving the sender a single credit when the receiver wants to receive data. The default mode for creating a receiver link is automatic. To create fetch-based receiver links, set the ReceiveMode property of the component to fetch before creating the link. To create the link call the CreateReceiverLink method, passing the name of an existing session, a unique name for the link, and the name of the target node for the sending end of the link.
amqp.ReceiveMode = AmqpReceiveModes.rmFetch;
amqp.CreateReceiverLink("SessionId", "ReceiverLinkName", "SomeTarget");
Sending Messages
The AMQP component has a Message property representing the message to send. The message's Value and ValueType fields must be set before sending, but all other fields are optional. To send the message, call the SendMessage function and pass in the name of the desired sender link. The MessageOut event will fire when the message has been sent, and the MessageOutcome event will fire when the outcome of the message is reported by the receiver.
The message flow, which determines the level of guarantee for the delivery of the message, is determined by whether a message is sent as settled or unsettled. By default, a sender link is created in mixed mode. In this mode a message will be sent as settled or unsettled based on the value of the message's Settled property, which is true (or settled) by default. To set a default sender mode for all messages sent over the link, see the DefaultSenderSettleMode config in the help files.
At Most Once Flow
An "at most once" flow model does not guarantee that the message will arrive at the target and cannot send duplicate messages. This is implemented by sending a message as "settled", which tells the receiver that the sender has not kept a record of the message and is not expecting a response. If the message fails to arrive at the receiver the only time it is sent, the message will be lost.
To send a message as "settled", set the message's Settled property to true (default).
amqp.Config("DefaultSenderSettleMode=1");
amqp.CreateSenderLink("SessionId", "SenderLinkName", "SomeTarget");
At Least Once Flow
An "at least once" flow model guarantees that the message will arrive at the target at least once, but does not guarantee that there will be no duplicates. This is implemented by sending a message as "unsettled", while the sender keeps a record of the message. When a receiver receives an unsettled message it responds by notifying the sender of the message's outcome. If the sender does not receive a response to an unsettled message, it sends the message again. This guarantees that the message will arrive at least once.
To send a message as "unsettled", set the message's Settled property to false.
amqp.Config("DefaultSenderSettleMode=0");
amqp.CreateSenderLink("SessionId", "SenderLinkName", "SomeTarget");
Receiving Messages
A receiver link uses a credit system to determine when the sender is allowed to send data. The AMQP component can create links in either "fetch" or "automatic" mode to determine the flow of receiving messages.
When a message arrives, the MessageIn event will fire and the ReceivedMessage will be set to the incoming message. Within the MessageIn event the State event parameter can be used to report the outcome of the message to the sender:
Value | Meaning | Description |
---|---|---|
0 | Accepted (default) | The message has been processed successfully. |
1 | Rejected | The message failed to process successfully. Includes an error describing why. |
2 | Released | The message has has not been (and will not be) processed. |
3 | Modified | Same as "Released", but with additional metadata for the sender. |
For more information on state values, see the MessageIn event in the documentation.
amqp.OnMessageIn += (s, e) =>
{
Console.WriteLine(amqp.ReceivedMessage.Value);
e.State = 2; // Tell the sender that the message was released
};
Fetch Flow
A receiver link created in "fetch" mode begins with 0 credit, so that the sender is not allowed to send immediately. Calling FetchMessage method adds a single credit to the link, allowing the sender to send a single message. This gives the receiver link manual control over when data is transmitted. The FetchMessage method will block until the message arrives, but this can be limited by the FetchTimeout property, which defaults to 60 seconds. Setting FetchTimeout to 0 will cause FetchMessage to block indefinitely until the message arrives. To create a receiver link in "fetch" mode, set the component's ReceiveMode method to "fetch" before creating the link.
amqp.ReceiveMode = AmqpReceiveModes.rmFetch;
amqp.CreateReceiverLink("SessionId", "ReceiverLinkName", "SomeTarget");
amqp.FetchMessage("ReceiverLinkName");
Automatic Flow
A receiver link in "automatic" mode effectively has unlimited credit. This allows the sender to send data as soon as it is available without the receiver having to fetch a message manually. For more information on how to control the credit in an automatic connection, refer to the DefaultCredit and DefaultCreditThreshold configuration options in the help files. To create a receiver link in "automatic" mode, set the component's ReceiveMode to "automatic" before creating the link.
amqp.ReceiveMode = AmqpReceiveModes.rmAutomatic;
amqp.CreateReceiverLink("SessionId", "ReceiverLinkName", "SomeTarget");
Message Structure
A message carries a value and a field specifying the value's type, as well as other optional information. These are set through the Value and ValueType fields. If the ValueType of the message is set to binary, the message's ContentType and ContentEncoding can be used to further describe the data.
A message can be uniquely identified by its MessageId property, and can be identified as part of a group of messages by its GroupId and GroupSequence properties. A message can also have a UserId for the user who created it, a Subject, and many other properties.
Basic
For simple applications, the value can be one of several primitive types:
Value Type | Description | Value Format |
---|---|---|
mvtNull (default) | Null | N/A (Value is ignored) |
mvtBoolean | Boolean | "True" or "False" |
mvtUbyte | Unsigned byte | 0 to 255 |
mvtUshort | Unsigned short | 0 to 65535 |
mvtUint | Unsigned integer | 0 to 4294967295 |
mvtUlong | Unsigned long | 0 to 18446744073709551615 |
mvtByte | Byte | -128 to 127 |
mvtShort | Short | -32768 to 32767 |
mvtInt | Integer | -2147483648 to 2147483647 |
mvtLong | Long | -9223372036854775808 to 9223372036854775807 |
mvtFloat | Float | IEEE 754 32-bit floating point number |
mvtDouble | Double | IEEE 754 64-bit floating point number |
mvtDecimal | Decimal | Hex-encoded byte string |
mvtChar | Char | Single character |
mvtTimestamp | Timestamp | Number of milliseconds since the Unix epoch (January 1, 1970 00:00:00 UTC). |
mvtUuid | UUID | Hex-encoded UUID in the form XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX (hyphens optional, case-insensitive) |
mvtBinary | Binary data | Hex-encoded byte string |
mvtString | String | String |
mvtSymbol | Symbolic value | ASCII string |
For instance to specify a string value set:
amqp.Message.ValueType = AMQPValueTypes.mvtString;
amqp.Message.Value = "Hello World!";
Advanced
For more complicated applications, the value can be composite data. AMQP uses binary composite data to avoid the overhead of sending documents, but for usability our component exposes that data to the user as JSON, with a ValueType of mvtJson. The JSON schema uses type-value objects consisting of an AMQP typename and a value, as shown here:
amqp.Message.ValueType = AMQPValueTypes.mvtJson;
amqp.Message.Value = "{\"type\": \"string\", \"value\": \"Hello World!\"}";
The typename can be any of the AMQP primitive types as well as "array", "list", or "map". The value for an array is a JSON array containing any number of type-value objects with the same type, while a list is the same but with no restriction to a single type. A map is a JSON array with an even number of type-value objects, with the even-indexed objects representing keys while the odd-indexed objects represent values. These data structures can also be nested, which allows for complicated message structures when necessary.
{
"type": "map",
"value": [
{ "type": "symbol", "value": "key1" },
{ "type": "string", "value": "value1" },
{ "type": "symbol", "value": "key2" },
{ "type": "int", "value": 100 }
]
}
Code Examples
Sending - At Most Once
Doesn't guarantee delivery - the message will be lost if it does not arrive at the receiver. See At Most Once Flow for more information.
// Setup
amqp.ContainerId = "UniqueContainerId";
amqp.Connect(SERVER, PORT);
amqp.CreateSession("SessionId");
amqp.CreateSenderLink("SessionId", "SenderLinkName", "SomeTarget");
// Create the message
amqp.Message.ValueType = AMQPValueTypes.mvtJson;
amqp.Message.Value = "{\"type\": \"string\", \"value\": \"Hello World!\"}";
// Leave Message.Settled as true for At Most Once flow
amqp.SendMessage("SenderLinkName");
// Closing the connection
amqp.Disconnect();
Sending - At Least Once
Guarantees delivery - the message will be re-sent if it is not acknowledged by the receiver. See At Least Once Flow for more information.
// Setup
amqp.ContainerId = "UniqueContainerId";
amqp.Connect(SERVER, PORT);
amqp.CreateSession("SessionId");
amqp.CreateSenderLink("SessionId", "SenderLinkName", "SomeTarget");
bool messageOutcome = false;
// With At Least Once flow, the receiver will report message outcome.
// We can check the outcome here and act accordingly.
amqp.OnMessageOutcome += (s, e) =>
{
messageOutcome = true;
if (e.State == 0)
Console.WriteLine("Outcome: Accepted");
else
Console.WriteLine("Outcome: Not Accepted");
};
// Create the message
amqp.Message.ValueType = AMQPValueTypes.mvtJson;
amqp.Message.Value = "{\"type\": \"string\", \"value\": \"Hello World!\"}";
amqp.Message.Settled = false; // Set Settled to false for At Least Once flow
amqp.SendMessage("SenderLinkName"); // Leave settled false for At Least Once flow
// Wait for Message Outcome
while (!messageOutcome)
{
amqp.DoEvents();
}
// Closing the connection
amqp.Disconnect();
Receiving - Automatic
Messages are received automatically, without blocking to wait for a message. See Automatic Flow for more information.
// Setup
amqp.ContainerId = "UniqueContainerId";
amqp.CreateSession("SessionId");
// Prepare to receive a message
amqp.OnMessageIn += (s, e) =>
{
Console.WriteLine(amqp.ReceivedMessage.Value);
};
// Create a receiver link and wait for a message
amqp.CreateReceiverLink("SessionId", "ReceiverLinkName", "SomeTarget");
// Loop until some external condition tell us it's time to stop
while (keepListening)
{
amqp.DoEvents();
}
// Close the connection
amqp.Disconnect();
Receiving - Fetch
Messages are only received when the receiver chooses to fetch a message. See Fetch Flow for more information.
// Setup
amqp.ContainerId = "UniqueContainerId";
amqp.CreateSession("SessionId");
// Prepare to receive a message
amqp.OnMessageIn += (s, e) =>
{
Console.WriteLine(amqp.ReceivedMessage.Value);
};
// Create a receiver link and wait for a message
amqp.ReceiveMode = AmqpReceiveModes.rmFetch;
amqp.CreateReceiverLink("SessionId", "ReceiverLinkName", "SomeTarget");
amqp.FetchMessage("ReceiverLinkName");
// FetchMessage blocks until a message is received
// Close the connection
amqp.Disconnect();
We appreciate your feedback. If you have any questions, comments, or suggestions about this article please contact our support team at kb@nsoftware.com.