Example

Topic Progress:

The code below is an example of a basic MQTT Listener agent. Take note of how the interfaces and methods have been implemented.

Please note that this example uses the M2MqttDotnetCore 1.0.7 NuGet package.

using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
using XMIoT.Framework;
using XMIoT.Framework.Settings;
using XMIoT.Framework.Settings.Enums;

namespace XMPro.MQTTAgents
{

public class Listener : IAgent
{

private Configuration config;
private MqttClient client;
private string Broker => this.config[“Broker”];
private string Topic => this.config[“Topic”];

public long UniqueId { get; set; }
public event EventHandler<OnPublishArgs> OnPublish;
public event EventHandler<OnDecryptRequestArgs> OnDecryptRequest;

public void Create(Configuration configuration)
{

this.config = configuration;
this.client = new MqttClient(this.Broker);
this.client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived;

}

public void Start()
{

if (this.client.IsConnected == false)
{

this.client.Connect(Guid.NewGuid().ToString());
this.client.Subscribe(new string[] { this.Topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });

}

}

private void Client_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
{

try
{

var message = Encoding.UTF8.GetString(e.Message);
this.OnPublish?.Invoke(this, new OnPublishArgs(JArray.Parse(message), “Output”));

}
catch (Exception ex)
{

Console.WriteLine($”{DateTime.UtcNow}|ERROR|XMPro.MQTTAgents.Listener|{ex.ToString()});

}

}

public void Destroy()
{

if (this.client?.IsConnected == true)

this.client.Disconnect();

}

public string GetConfigurationTemplate(string template, IDictionary<string, string> parameters)
{

var settings = Settings.Parse(template);
new Populator(parameters).Populate(settings);
return settings.ToString();

}

public string[] Validate(IDictionary<string, string> parameters)
{

int i = 1;
var errors = new List<string>();
this.config = new Configuration() { Parameters = parameters };

if (String.IsNullOrWhiteSpace(this.Broker))

errors.Add($”Error {i++}: Broker is not specified.”);

 

if (String.IsNullOrWhiteSpace(this.Topic))

errors.Add($”Error {i++}: Topic is not specified.”);

 

var grid = new Grid();
grid.Value = this.config[“PayloadDefinition”];

if (grid.Rows.Any() == false)

errors.Add($”Error {i++}: Payload Definition is not specified.”);

return errors.ToArray();

}

public IEnumerable<XMIoT.Framework.Attribute> GetOutputAttributes(string endpoint, IDictionary<string, string> parameters)
{

var grid = new Grid();
grid.Value = parameters[“PayloadDefinition”];
foreach (var row in grid.Rows)
{

yield return new XMIoT.Framework.Attribute(row[“Name”].ToString(), (Types)Enum.Parse(typeof(Types), row[“Type”].ToString()));

}

}

}

}

Comments are closed.

This is the legacy version of the XMPro Documentation site. For the latest XMPro documentation, please visit documentation.xmpro.com

X