Data Streams – Implementing an Agent

How Can We Help?

Data Streams – Implementing an Agent

< Back

When creating an agent, the agent must adhere to several expectations set by the Stream Host for proper function to occur. These expectations take the form of interface contracts within the XMIoT.Framework namespace, in the XMPro.IOT.Framework nuget package. This article will describe each interface and their members, what they signify, and the expectations on them by the Stream Host.

IAgent

This is the base interface for all agents and must be implemented.

The following inherited members are relevant when designing a stream.

GetConfigurationTemplate

 

When a user requests the configuration page for a stream object at design time, the GetConfigurationTemplate method is called. The “parameters” argument contains any saved or in-progress configuration values, while the template argument is the layout of the parameters on the page in JSON form. It is necessary to at least populate the template with the saved parameters, as shown, and is also the correct place to hide/show settings or alter their options based on the configuration values.

Validate

 

The Validate method is called when a user attempts to check the integrity of a stream. The parameters argument contains the configuration of the stream object to be checked. For every potential issue in the object’s configuration, add one string to the returned array briefly explaining the issue. Every error returned will be available to the user to see and correct.

The following inherited members are relevant both when designing and executing a stream.

UniqueId

UniqueId is used by the Stream Host to identify running stream objects. Simply allowing for the storage and retrieval of the value will satisfy this requirement.

OnDecryptRequest

 

The OnDecryptRequest event is used to render configuration values that have been encrypted for storage into plaintext and can be used as demonstrated. The OnDecryptRequestArgs argument is storage for both the encrypted string and the decrypted text. Please store the result securely if the value must be consulted multiple times during regular operation.

GetOutputAttributes

 

The GetOutputAttributes method is required to list every field in the data points that will be output by the stream object. This can be requested at design time when downstream objects are being configured, or by other stream objects being instantiated and readied for execution.

The endpoint argument contains the name of the endpoint being requested, which is only significant if the stream object will output different events to multiple outputs. The parameters argument contains the configuration of the stream object.

Some stream objects need to know the structure of the events they will receive as part of their configuration or operation, therefore please be certain to include every attribute with the correct spelling and data type for the supplied configuration.

The following inherited members are relevant when executing a stream.

Create

 

When a stream is being assembled for execution, each object is instantiated and their Create method is called. The Configuration argument will contain any parameters assigned to the object at design time.

Start

 

After all stream objects have been Created, they are then all Started. This is the signal for sources of data that it is safe to begin the data flow. If your agent is not a Listener or Context Provider, this method may be left empty.

OnPublish

 

The Stream Host listens to the OnPublish event for data to be sent to downstream objects. If the stream object will output different events to multiple outputs, the constructor of OnPublishArgs accepting an output endpoint name is required.

It is expected that after the Create method ends, the stream object is fully ready to receive data from any upstream objects. However, the order of execution among stream objects is not guaranteed, therefore if the stream object is a source of data (i.e. a Listener or Context Provider) it must not begin to send data at this time as downstream objects may not be prepared to receive data.

Destroy

 

The Destroy method is called to signal the end of the stream, whether by the Stream Host shutting down or the Stream itself being unpublished. It is the correct place to clean up any long-running tasks or disposable resources.

IPollingAgent

This is an interface for agents that wish to repeatedly poll an external resource for data. Inheriting from this interface will automatically add a numerical parameter with a key of “#PollingInterval” to the stream object’s configuration whenever it is requested, if one does not exist already. This parameter will control the delay between repeated calls of Poll.

The following inherited members are relevant when executing a stream.

Poll

 

The Poll method will be called by the Stream Host at a user-configurable interval. The next interval will not begin to count down until the previous Poll has completed, to prevent potentially slow resources from being overwhelmed by requests.

IPublishesError

 

This is an interface for agents that may have meaningful error streams and implement an Error output endpoint.

The following inherited members are relevant when executing a stream.

OnPublishError

 

Similar to the OnPublish event of the IAgent interface, the Stream Host will listen to the OnPublishError event for data to be passed down the stream via the Error endpoint. The OnErrorArgs defines the structure of the output, requiring the Id of the object, the date and time of the error, the source of the error (eg. the name of the relevant method), a short user-friendly description of the error, a longer detailed explanation of the error if necessary (eg. exception data), and optionally the event that caused the error (not shown).

IReceivingAgent

 

This is an interface for agents that receive data from further upstream.

The following inherited members are relevant when designing a stream.

GetInputAttributes

 

The GetInputAttributes method is significant for agents that require an input map to be performed on their incoming data stream. The output value of this method will be used as requirements to match when configuring the input map of the stream object at design time. Agents that do not require any input mapping may return an empty array or enumerable.

The following inherited members are relevant when designing or executing a stream.

OnRequestParentOutputAttributes

 

The OnRequestParentOutputAttributes event is used to ask the Stream Host to fetch the shape of incoming events from the immediate upstream stream object. The arguments object requires the UniqueId of the stream object and the name of the input endpoint to request information for. This can be useful when readying the object for execution or during configuration, for example by making the output available to select in some settings.

The following inherited members are relevant when executing a stream.

Receive

 

The Receive method is called by the Stream Host whenever a new batch of events is received for processing. This is the correct place to change the state of the stream object in response to incoming data, and to create or modify events for publishing.

IRemoteAgent

 

This is an interface for agents that can be used as remote publishers or receivers.

There are no inherited members of the IRemoteAgent interface. However, remote agent functionality depends on the presence of the interface, and there are extension methods that are useful to remote agents.

IsRemote

 

This extension method can be called to determine if a stream object is currently running as a remote publisher or receiver. It will throw an exception if the agent does not implement IRemoteAgent.

FromId

This extension method can be called to get the Id of the stream object a remote agent is standing in for. This can be useful for getting the correct Id for use in channel names or other transport configuration, as it will be identical between the remote publisher and remote receiver. It will throw an exception if the agent does not implement IRemoteAgent or the stream object is not currently running as a remote agent.

 

Comments are closed.

This is the legacy version of the XMPro Documentation site. For the latest XMPro documentation, please visit documentation.xmpro.com

X