The code below is an example of a basic MQTT Listener agent. Take note of how the interfaces and methods have been implemented.
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
using XMIoT.Framework;
using XMIoT.Framework.Settings;
using XMIoT.Framework.Settings.Enums;
namespace XMPro.MQTTAgents
{
public class Listener : IAgent
{
private Configuration config;
private MqttClient client;
private string Broker => this.config[“Broker”];
private string Topic => this.config[“Topic”];
public long UniqueId { get; set; }
public event EventHandler<OnPublishArgs> OnPublish;
public event EventHandler<OnDecryptRequestArgs> OnDecryptRequest;
public void Create(Configuration configuration)
{
this.config = configuration;
this.client = new MqttClient(this.Broker);
this.client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived;
}
public void Start()
{
if (this.client.IsConnected == false)
{
this.client.Connect(Guid.NewGuid().ToString());
this.client.Subscribe(new string[] { this.Topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
}
}
private void Client_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
{
try
{
var message = Encoding.UTF8.GetString(e.Message);
this.OnPublish?.Invoke(this, new OnPublishArgs(JArray.Parse(message), “Output”));
}
catch (Exception ex)
{
Console.WriteLine($”{DateTime.UtcNow}|ERROR|XMPro.MQTTAgents.Listener|{ex.ToString()}“);
}
}
public void Destroy()
{
if (this.client?.IsConnected == true)
this.client.Disconnect();
}
public string GetConfigurationTemplate(string template, IDictionary<string, string> parameters)
{
var settings = Settings.Parse(template);
new Populator(parameters).Populate(settings);
return settings.ToString();
}
public string[] Validate(IDictionary<string, string> parameters)
{
int i = 1;
var errors = new List<string>();
this.config = new Configuration() { Parameters = parameters };
if (String.IsNullOrWhiteSpace(this.Broker))
errors.Add($”Error {i++}: Broker is not specified.”);
if (String.IsNullOrWhiteSpace(this.Topic))
errors.Add($”Error {i++}: Topic is not specified.”);
var grid = new Grid();
grid.Value = this.config[“PayloadDefinition”];
if (grid.Rows.Any() == false)
errors.Add($”Error {i++}: Payload Definition is not specified.”);
return errors.ToArray();
}
public IEnumerable<XMIoT.Framework.Attribute> GetOutputAttributes(string endpoint, IDictionary<string, string> parameters)
{
var grid = new Grid();
grid.Value = parameters[“PayloadDefinition”];
foreach (var row in grid.Rows)
{
yield return new XMIoT.Framework.Attribute(row[“Name”].ToString(), (Types)Enum.Parse(typeof(Types), row[“Type”].ToString()));
}
}
}
}
Comments are closed.