Prerequisites
To write an agent, a version of Visual Studio that supports .NET Core 2.1 needs to be installed. According to the .NET Blog, .NET Core 2.1 is supported by Visual Studio 15.7, Visual Studio for Mac and Visual Studio Code. Agents are generally written in C# as library projects and relies on the XMPro.IoT.Framework NuGet package.
Overview
To get started with developing a new agent, create a new project in Visual Studio and import the NuGet package referenced in the prerequisites. When writing the code for an agent, you will have to implement a number of interfaces. Which interfaces to implement depends on the category under which your agent will fall.
The interfaces that can be implemented are as follows:
- IAgent
- IPollingAgent
- IReceivingAgent
- IPublishesError
The matrix below shows which interface needs to be implemented for which category agent:
IAgent | IPollingAgent | IReceivingAgent | IPublishesError | |
Listener | Required | Required | Optional | Optional |
Context Provider | Required | Required | Optional | Optional |
Transformation | Required | Optional | Required | Optional |
Action Agent/ Function | Required | Optional | Required | Optional |
IAgent is the primary interface which has to be implemented by all agents as it provides the structure for the workings of the agent. After implementing this interface, there are several methods you have to add to your project that forms part of this predefined structure.
Settings/ Configurations
Some agents need to be provided with configurations by the user, for example, for a CSV listener agent to be able to get records from a CSV file, it needs the following:
- Polling interval (in seconds)
- CSV file
- CSV Definition
Each of these settings needs to be referenced in the code and needs to correspond to the settings template that should be created when packaging your agent.
An example of the settings template (generated using the Stream Integration Manager) after it has been rendered in Data Stream Designer is shown in the image on the right. The settings in this example consists of the following controls:
- Group (Data)
- File Upload
- Group (Payload)
- Grid
Each control needs to have a Key field defined in the template. This key uniquely identifies it in the template and allows the agent code to access its value at any time. To get the value contained in a setting, use the following code:
string mySetting = parameters[“myUniqueKey”];
Before a template is rendered on the screen, or if a postback occurs on any control in the template, the method below would be called to allow the agent an opportunity to make any necessary runtime changes to the template, for example, verifying user credentials, displaying all tables of a selected database in a drop-down list, etc. In this example, no changes are being made to the template but, if needed, they can be added to the todo section.
public string GetConfigurationTemplate(string template, IDictionary<string, string> parameters)
{
//parse settings JSON into Settings object
var settings = Settings.Parse(template);
//populate the settings/configuration controls with the user selected values
new Populator(parameters).Populate(settings);
// ToDo: update the controls, values or the data sources here
//return the updated settings xml
return settings.ToString();
}
Validate
If a user tries to run an Integrity Check on a data stream in Data Stream Designer, all agents will be requested to validate the configurations they have been provided. An agent has to use this opportunity to inform the user about any configurations that are incorrect, for example, credentials that have expired, required values that are missing, etc.
To validate the configurations/ settings in an agent, the Validate method needs to be implemented. This method returns an array of errors that occurred. If validation was successful, an empty array would be returned.
The example code below verifies if a user specified a broker address, topic, and payload definition for an MQTT agent:
public string[] Validate(IDictionary<string, string> parameters)
{
int i = 1;
var errors = new List<string>();
this.config = new Configuration() { Parameters = parameters };
if (String.IsNullOrWhiteSpace(this.Broker))
errors.Add($”Error {i++}: Broker is not specified.”);
if (String.IsNullOrWhiteSpace(this.Topic))
errors.Add($”Error {i++}: Topic is not specified.”);
var grid = new Grid();
grid.Value = this.config[“PayloadDefinition”];
if (grid.Rows.Any() == false)
errors.Add($”Error {i++}: Payload Definition is not specified.”);
return errors.ToArray();
}
Output Payload
Each agent has the responsibility to inform the Engine about the structure of the payload that will be produced by the agent. To do this, implement the following method:
IEnumerable<Attribute> GetOutputAttributes(string endpoint, IDictionary<string, string> parameters)
This method returns a collection that has an Attribute type, which is a type that represents the name and type of a given attribute in the outgoing payload. As from XMPro.IOT.Framework version 3.0.2, comparison/ equality operations are also supported in Attribute, for example:
new XMIoT.Framework.Attribute(“Name1”, Types.DateTime).Equals(new XMIoT.Framework.Attribute(“Name2”, Types.String));
Create
Each agent needs to implement a method called Create, which will be invoked when your agent is being hosted. User defined configuration is passed as a parameter to this method and should be stored in a class variable as far as possible for later use. This is a good point to provision any resources needed for the working of your agent.
void Create(Configuration configuration)
{
this.config = configuration;
// ToDo: Provision any resources or write Startup logic.
}
Start
The Start method needs to be implemented by all agents. This method will be invoked when your agent is hosted and starts to work.
void Start()
Destroy
Each agent needs to implement a Destroy method, which will be invoked when a data stream is being unpublished. Use this method to release any resources or memory that your agent may have acquired during its life time.
void Destroy()
Publishing Events
To push the events to the next agent, your agent should invoke the OnPublish event with the events passed as arguments:
this.OnPublish?.Invoke(this, new OnPublishArgs(new JArray(), “EndpointName”));
Decrypting Values
If an agent’s configuration contains a Secure/Password Textbox, its value will automatically be encrypted. To decrypt the value, use following set of instructions:
var request = new OnDecryptRequestArgs(value);
this.OnDecryptRequestArgs?.Invoke(this, request);
var decryptedVal = request.DecryptedValue;
Custom Events
If your agent requires some third-party event subscriptions, the event handlers must handle the exceptions and not let it bubble up.
The IPollingAgent interface allows time-based operations. Implementing this interface will automatically add a PollingInterval setting to the configuration template of your agent, which can be used by the user to specify the time for polling. The following method will be invoked every time the poll interval expires:
void Poll()
This method can be used to listen to a third-party system for changes every x seconds.
If your agent is required to receive inputs from other agents, you should implement the IReceivingAgent interface.
Input Payload
Each agent is responsible to inform the Engine about the structure of the payload it consumes. To achieve this in your agent, implement the following method:
IEnumerable<Attribute> GetInputAttributes(string endpoint, IDictionary<string, string> parameters)
This method returns a collection consisting of Attribute, which is a type that represents the name and type of a given attribute in the incoming payload.
Input Mapping
In most cases, if an incoming payload structure is supposed to be different to what the parent is sending, i.e. the Input Payload has been specified above, the user will have to map parent outputs to current agent’s inputs. To enable this, mark the Require Input Map flag as true in the Stream Integration Manager when packaging the agent.
Endpoint
Each agent can have a number of input and output endpoints. Endpoints are the points where incoming or outgoings arrows are connected. Each endpoint consists of a Name<String> attribute. You will be passed an endpoint name when queried for an Input payload definition. Be sure to specify the endpoint name when querying the parent’s output payload definition.
Parent Outputs
All receiving agents can query the structure of parent agent outputs connected at a given endpoint by invoking an event, as demonstrated in the example below:
var args = new OnRequestParentOutputAttributesArgs(this.UniqueId, “Input”);
this.OnRequestParentOutputAttributes.Invoke(this, args);
var pOuts = args.ParentOutputs;
Receiving Events
Events published to a receiving agent can be received by implementing the following method:
void Receive(string endpointName, JArray events)
The endpointName parameter will identify which endpoint the events have been received at.
An agent can publish messages to an error endpoint by implementing the IPublishesError interface. Any unhandled error in an agent will be captured and error information will be published to the error endpoint.
Implement the interface member:
public event EventHandler<OnErrorArgs> OnPublishError;
To push the error to the next agent, the OnPublishError event should be invoked and the error information should be passed as arguments:
this.OnPublishError?.Invoke(this, new OnErrorArgs(AgentId, Timestamp, Source, Error, DetailedError, Data));
Categories
Listeners
Listeners are created by implementing IAgent and IPollingAgent interfaces. To push the events to the next receiver, the OnPublish event should be invoked and the events should be passed as arguments.
Action Agents/ Functions
Action Agents are created by implementing the IAgent and IReceivingAgent interfaces. The Receive method will be called every time events are received by this agent. To publish these events again, the same logic as per the Listener agent can be used.
Context Providers
Context Providers are created by implementing the IAgent, IPollingAgent interfaces. They are very similar to Listeners; however, Context Providers publish all available records/events when polled instead of only publishing the newer/changed ones.
Transformations
Transformations are implemented in a similar way as Action agents, except that all Transformations should have the Require Input Map flag set to false and must not implement the GetInputAttributes method, hence it should be:
public IEnumerable<XMIoT.Framework.Attribute> GetIntputAttributes(string endpoint, IDictionary<string, string> parameters)
{
throw new NotImplementedException();
}
Example
The code below is an example of a basic MQTT Listener agent. Take note of how the interfaces and methods have been implemented.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
using XMIoT.Framework;
using XMIoT.Framework.Settings;
using XMIoT.Framework.Settings.Enums;namespace XMPro.MQTTAgents
{
public class Listener : IAgent
{
private Configuration config;
private MqttClient client;
private string Broker => this.config[“Broker”];
private string Topic => this.config[“Topic”];
public long UniqueId { get; set; }
public event EventHandler<OnPublishArgs> OnPublish;
public event EventHandler<OnDecryptRequestArgs> OnDecryptRequest;
public void Create(Configuration configuration)
{
this.config = configuration;
this.client = new MqttClient(this.Broker);
this.client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived;
}
public void Start()
{
if (this.client.IsConnected == false)
{
this.client.Connect(Guid.NewGuid().ToString());
this.client.Subscribe(new string[] { this.Topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
}
}
private void Client_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)
{
try
{
var message = Encoding.UTF8.GetString(e.Message);
this.OnPublish?.Invoke(this, new OnPublishArgs(JArray.Parse(message), “Output”));
}
catch (Exception ex)
{
Console.WriteLine($”{DateTime.UtcNow}|ERROR|XMPro.MQTTAgents.Listener|{ex.ToString()}“);
}
}
public void Destroy()
{
if (this.client?.IsConnected == true)
this.client.Disconnect();
}
public string GetConfigurationTemplate(string template, IDictionary<string, string> parameters)
{
var settings = Settings.Parse(template);
new Populator(parameters).Populate(settings);
return settings.ToString();
}
public string[] Validate(IDictionary<string, string> parameters)
{
int i = 1;
var errors = new List<string>();
this.config = new Configuration() { Parameters = parameters };
if (String.IsNullOrWhiteSpace(this.Broker))
errors.Add($”Error {i++}: Broker is not specified.”);
if (String.IsNullOrWhiteSpace(this.Topic))
errors.Add($”Error {i++}: Topic is not specified.”);
var grid = new Grid();
grid.Value = this.config[“PayloadDefinition”];
if (grid.Rows.Any() == false)
errors.Add($”Error {i++}: Payload Definition is not specified.”);
return errors.ToArray();
}
public IEnumerable<XMIoT.Framework.Attribute> GetOutputAttributes(string endpoint, IDictionary<string, string> parameters)
{
var grid = new Grid();
grid.Value = parameters[“PayloadDefinition”];
foreach (var row in grid.Rows)
{
yield return new XMIoT.Framework.Attribute(row[“Name”].ToString(), (Types)Enum.Parse(typeof(Types), row[“Type”].ToString()));
}
}
}
}