Skip to content
This repository was archived by the owner on Jul 16, 2023. It is now read-only.

Nuclei.Communication = Getting started

Petrik van der Velde edited this page May 30, 2014 · 1 revision

Define the CommandSet and NotificationSet interface derivatives.

public interface IMyCommandSet : ICommandSet
{
    Task MyCommand(double aParameter, [InvocationTimeout]int timeoutInMilliseconds, [InvocationRetryCount]int retryCount);

    Task<int> MyCommandReturningSomething(double aParameter, [InvocationTimeout]int timeoutInMilliseconds, [InvocationRetryCount]int retryCount);
}

public interface IMyNotificationSet : INotificationSet
{
    event EventHandler OnMyNotification;
}

Define the classes that will provide the implementations of the commands and/or notifications. Note that these classes do not need to implement the interfaces.

public sealed class MyClass
{
    public void MyMethod(double aParameter, [InvokingEndpoint]EndpointId remoteEndpoint, [InvocationMessage]MessageId message)
    {
        return;
    }

    public int MyMethodReturningSomething(double aParameter)
    {
        return (int)aParameter + 10;
    }

    public event EventHandler MyEvent;
}

Create a class that implements IInitializeCommunicationInstances and use it to map the interface methods / events to the implementation. On the endpoint that owns the implementation add mappings to the provided commands and provided notifications. On the endpoint that wants to use the commands and notifications add mappings to the required commands and required notifications.

Each command and notification can be assigned to one or more CommunicationSubject values. Each CommunicationSubject indicates a topic/subject that the current application is either 'interested' in or provides capabilities for.

The initializer for the implementation owning side looks like:

public sealed class MyInitializerForTheImplementationOwningEndpoint : IInitializeCommunicationInstances
{
    private readonly IComponentContext m_Context;
    
    public MyInitializer(IComponentContext context)
    {
        m_Context = context;
    }

    /// <summary>
    /// Registers all the commands that are provided by the current application.
    /// </summary>
    public void RegisterProvidedCommands()
    {
        MyClass instance = context.Resolve<MyClass>();
    
        var map = CommandMapper<IMyCommandSet>.Create();
        
        map.From<double, int, int>((command, value, timeout, retry) => command.MyCommand(value, timeout, retry))
            .To((double value, EndpointId endpoint, MessageId message) => instance.MyMethod(value, endpoint, message));
            
        map.From<double, int, int>((command, value, timeout, retry) => command.MyCommandReturningSomething(value, timeout, retry))
            .To((double value) => instance.MyMethodReturningSomething(value));
            
        var collection = m_Context.Resolve<RegisterCommand>();
            collection(
                map.ToMap(),
                m_Subjects.Select(s => new SubjectGroupIdentifier(s, new Version(1, 0), "a")).ToArray());
    }

    /// <summary>
    /// Registers all the commands that the current application requires.
    /// </summary>
    public void RegisterRequiredCommands()
    {
        // Do nothing ...
    }

    /// <summary>
    /// Registers all the notifications that are provided by the current application.
    /// </summary>
    public void RegisterProvidedNotifications()
    {
        var instance = m_Context.Resolve<MyClass>();

        var map = NotificationMapper<IMyNotificationSet>.Create();
        instance.MyEvent += map.From(t => t.MyEvent += null)
            .GenerateHandler();

        var collection = m_Context.Resolve<RegisterNotification>();
        collection(map.ToMap(), m_Subjects.Select(s => new SubjectGroupIdentifier(s, new Version(1, 0), "a")).ToArray());
    }

    /// <summary>
    /// Registers all the notifications that the current application requires.
    /// </summary>
    public void RegisterRequiredNotifications()
    {
        // Do nothing ...
    }

    /// <summary>
    /// Performs initialization routines that need to be performed before to the starting of the
    /// communication system.
    /// </summary>
    public void InitializeBeforeCommunicationSignIn()
    {
        // Initialize any data / code here before the communication
        // system starts opening ports and initiates discovery of remote
        // endpoints.
    }

    /// <summary>
    /// Performs initialization routines that need to be performed after the sign in of the
    /// communication system.
    /// </summary>
    public void InitializeAfterCommunicationSignIn()
    {
        // Initialize any data / code here that needs the communication
        // system to be up and running.
    }
}

The initializer for the command and notification using side looks like:

public sealed class MyInitializerForTheImplementationOwningEndpoint : IInitializeCommunicationInstances
{
    private readonly IComponentContext m_Context;
    
    public MyInitializer(IComponentContext context)
    {
        m_Context = context;
    }

    /// <summary>
    /// Registers all the commands that are provided by the current application.
    /// </summary>
    public void RegisterProvidedCommands()
    {
        // Do nothing ...
    }

    /// <summary>
    /// Registers all the commands that the current application requires.
    /// </summary>
    public void RegisterRequiredCommands()
    {
        var registration = m_Context.Resolve<RegisterRequiredCommand>();
            registration(typeof(IMyCommandSet), m_Subjects.Select(s => new SubjectGroupIdentifier(s, new Version(1, 0), "a")).ToArray());
    }

    /// <summary>
    /// Registers all the notifications that are provided by the current application.
    /// </summary>
    public void RegisterProvidedNotifications()
    {
        // Do nothing ...
    }

    /// <summary>
    /// Registers all the notifications that the current application requires.
    /// </summary>
    public void RegisterRequiredNotifications()
    {
        var registration = m_Context.Resolve<RegisterRequiredNotification>();
            registration(typeof(IMyNotificationSet), m_Subjects.Select(s => new SubjectGroupIdentifier(s, new Version(1, 0), "a")).ToArray());
    }

    /// <summary>
    /// Performs initialization routines that need to be performed before to the starting of the
    /// communication system.
    /// </summary>
    public void InitializeBeforeCommunicationSignIn()
    {
        // Initialize any data / code here before the communication
        // system starts opening ports and initiates discovery of remote
        // endpoints.
    }

    /// <summary>
    /// Performs initialization routines that need to be performed after the sign in of the
    /// communication system.
    /// </summary>
    public void InitializeAfterCommunicationSignIn()
    {
        // Initialize any data / code here that needs the communication
        // system to be up and running.
    }
}

Add the MyClass to an Autofac dependency injection container.

var builder = new Autofac.ContainerBuilder();
builder.Register(c => new MyClass())
    .SingleInstance();

Finally add the communication system elements to the application by loading the CommunicationModule into an Autofac container builder. There currently are no other pre-build DI modules, although those could be made if enough interest exists.

bool allowChannelDiscovery = true;

var builder = new Autofac.ContainerBuilder();
builder.RegisterModule(
    new Nuclei.Communication.CommunicationModule(
        new[]
            {
                ChannelType.NamedPipe,
                ChannelType.TcpIP,
            }
        allowChannelDiscovery));

At this point the communication system is ready to go to work. Once you start the application the communication system will be loaded and will start opening connections and potentially searching for remote endpoints. Note that if you don't allow channel discovery then you have to provide some manual way to determine what endpoints exist. Use the ManualEndpointDiscovery delegate to provide these endpoints to the communication layer.

var container = // Create your DI container here ..

var resolver = container.Resolve<ManualEndpointConnection>();

var endpointId = // The EndpointId of the remote endpoint
var channelUri = "net.tcp://myserver:9876/myremoteapp"
resolver(endpointId, channelUri);

If you do allow channel discovery then one or more UDP endpoints will be opened to broadcast discovery information (based on the WS-Discovery approach). Note that even for a named-pipe channel a UDP endpoint is opened. Remote endpoints (choose to) ignore these endpoints.

Once the application is connected, watch the ICommunicationFacade.OnEndpointConnected event to be notified of endpoints successfully connecting, to one or more remote endpoints you can invoke commands on those remote endpoints

var builder = new Autofac.ContainerBuilder();
        
// ... do stuff with the builder to fill it up ...
var container = builder.Build();

var remoteCommands = container.Resolve<ISendCommandsToRemoteEndpoints>();
if (remoteCommands.HasCommandFor(remoteEndpointId, typeof(IMyCommandSet)))
{
    var commands = remoteCommands.CommandsFor<IMyCommandSet>(remoteEndpointId);
    var task = commands.MyCommandReturningSomething(10.0, 30 * 1000, 3);
    task.Wait();

    // .. do stuff with the return result ...
}

To get notifications on the remote endpoint

var builder = new Autofac.ContainerBuilder();
        
// ... do stuff with the builder to fill it up ...
var container = builder.Build();

var remoteNotifications = container.Resolve<INotifyOfRemoteEndpointEvents>();
if (remoteNotifications.HasNotificationFor(remoteEndpointId, typeof(IMyNotificationSet))
{
    var notification = remoteNotifications.NotificationsFor<IMyNotificationSet>(remoteEndpointId);
    notification.OnMyNotification += (s, e) => Console.WriteLine("Hello world");
}

Uploading / downloading data requires a few steps due to the fact that it is technically not possible to directly upload data, data can only be downloaded. So the endpoint that wants to upload data will have to request that the remote endpoint starts a data download.

On the uploading side:

var fileToUpload = @"c:\temp\MyFileWithData.txt";

// First register the file that should be uploaded
var uploads = container.Resolve<IStoreUploads>();
var token = uploads.Register(fileToUpload);

// Somehow notify the downloading side that there is a new upload waiting.
// This could be done through a custom command set.
// The uploading side will need:
// - The upload token
// - The ID of the current endpoint 

On the downloading side:

// Path to the location where the downloaded stream will be stored on disk.
var filePath = Path.GetRandomFileName();

var downloadDelegate = container.Resolve<DownloadDataFromRemoteEndpoints>();
downloadDelegate(callingEndpointId, uploadToken, filePath);
Clone this wiki locally