Creating WCF custom transport channel

Primary purpose of using WCF within organization is to wrap-up existing components into service oriented architecture (SOA) that can be then easily reused and maintained. When communicating within the local network or between the local components, you may want to implement your own WCF transport channel satisfying your specific requirements.

In this article I will show you how to write your own transport channel to be used without the contract bindings.

wcf_custom_channel

Lets’s start from the CustomBinding class that needs to implement Binding base. One thing to remember is that in CreateBindingElements method the transport element must be the last in the collection, otherwise you will get exception.

 public class CustomBinding : Binding
 {
    readonly MessageEncodingBindingElement messageElement;
    readonly CustomChannelBindingElement transportElement;

    public CustomBinding()
    {
        this.messageElement = new TextMessageEncodingBindingElement();
        this.transportElement = new CustomChannelBindingElement(); //must be the last in collection
    }

    public override BindingElementCollection CreateBindingElements()
    {
        return new BindingElementCollection(new BindingElement[] {
            this.messageElement,
            this.transportElement
        });
    }

    public override string Scheme
    {
        get { return this.transportElement.Scheme; }
    }
}

Next we need to create CustomChannelBindingElement that inherits from TransportBindingElement class. The implementation is quite simple. The scheme may be any prefix you like “http” or “myHttp”. In build channel and listener methods we are simply creating new instances of CustomChannelFactory and CustomChannelListener objects.

 public class CustomChannelBindingElement : TransportBindingElement
 {
    public CustomChannelBindingElement() { }

    public CustomChannelBindingElement(CustomChannelBindingElement clone) { }

    public override string Scheme
    {
        get { return "myprotocol"; }
    }

    public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
    {
        return typeof(TChannel) == typeof(IRequestChannel);
    }

    public override bool CanBuildChannelListener<TChannel>(BindingContext context)
    {
        return typeof(TChannel) == typeof(IReplyChannel);
    }

    public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
    {
        if (context == null)
        {
            throw new ArgumentNullException("context");
        }
        if (!CanBuildChannelFactory<TChannel>(context))
        {
            throw new ArgumentException(String.Format("Unsupported channel type: {0}.", typeof(TChannel).Name));
        }
        return (IChannelFactory<TChannel>)(object)new CustomChannelFactory(this, context);
    }

    public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
    {
        if (context == null)
        {
            throw new ArgumentNullException("context");
        }
        if (!CanBuildChannelListener<TChannel>(context))
        {
            throw new ArgumentException(String.Format("Unsupported channel type: {0}.", typeof(TChannel).Name));
        }
        return (IChannelListener<TChannel>)(object)new CustomChannelListener(this, context);
    }

    public override BindingElement Clone()
    {
        return new CustomChannelBindingElement(this);
    }
}

Next, we need to create CustomChannelFactory class passing in transportElement and BindingContext to the constructor. Please note the context.BindingParameters.Remove method removes binding element before the validation takes place in async calls. Then, based on MessageEncodingBindingElement we create encoder factory.

 class CustomChannelFactory : ChannelFactoryBase<IRequestChannel>
 {
    readonly MessageEncoderFactory encoderFactory;

    public CustomChannelFactory(CustomChannelBindingElement transportElement, BindingContext context)
        : base(context.Binding)
    {
        var messageElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();
        this.encoderFactory = messageElement.CreateMessageEncoderFactory();
    }

    protected override IRequestChannel OnCreateChannel(EndpointAddress address, Uri via)
    {
        return new CustomRequestChannel(this, encoderFactory, address, via);
    }

    protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    protected override void OnEndOpen(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    protected override void OnOpen(TimeSpan timeout)
    {

    }
}

Next step is to create CustomChannelListener inheriting from ChannelListenerBase. In this class I have also implemented some methods needed when working with async calls.

 class CustomChannelListener : ChannelListenerBase<IReplyChannel>
 {
    #region privates
    readonly Uri uri;
    readonly MessageEncoderFactory encoderFactory;
    CustomReplyChannel innerChannel;
    delegate IReplyChannel AsyncOnAcceptCaller(TimeSpan timeout);
    AsyncOnAcceptCaller asyncOnAcceptCaller; 
    #endregion

    public CustomChannelListener(CustomChannelBindingElement transportElement, BindingContext context)
        : base(context.Binding)
    {
        this.uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
        var messageElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();

        this.encoderFactory = messageElement.CreateMessageEncoderFactory();
        this.asyncOnAcceptCaller = new AsyncOnAcceptCaller(this.OnAcceptChannel);
    }

    protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
    {
        var address = new EndpointAddress(this.Uri);
        innerChannel = new CustomReplyChannel(this, encoderFactory, address);

        return innerChannel;
    }

    protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
    {
        return asyncOnAcceptCaller.BeginInvoke(timeout, callback, state);
    }

    protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
    {
        return asyncOnAcceptCaller.EndInvoke(result);
    }

    protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    protected override bool OnEndWaitForChannel(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    protected override bool OnWaitForChannel(TimeSpan timeout)
    {
        throw new NotImplementedException();
    }

    public override Uri Uri
    {
        get { return this.uri; }
    }

    protected override void OnAbort()
    {
        throw new NotImplementedException();
    }

    protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    protected override void OnClose(TimeSpan timeout)
    {
        throw new NotImplementedException();
    }

    protected override void OnEndClose(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    protected override void OnEndOpen(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    protected override void OnOpen(TimeSpan timeout)
    {

    }
}

Let’s create our channelbase now. Please note that you need to pass in ChannelManagerBase to the constructor. Our read and write message methods will simply create new messages with the data as a string passed in from the request and reply channels. Depending on your requirements you may want to implement your own mechanism of creating messages eg. read from files etc.

 public abstract class CustomChannelBase : ChannelBase
 {
    #region privates
    readonly EndpointAddress address;
    readonly MessageEncoder encoder;
    readonly ChannelManagerBase manager;

    #endregion

    public EndpointAddress RemoteAddress
    {
        get { return this.address; }
    }

    public CustomChannelBase(ChannelManagerBase manager, MessageEncoderFactory encoderFactory, EndpointAddress addres)
        : base(manager)
    {
        this.address = address;
        this.manager = manager;
        this.encoder = encoderFactory.CreateSessionEncoder();
    }

    public Message ReadMessage(string data)
    {
        return Message.CreateMessage(MessageVersion.Default, "fault", data);
    }

    public Message WriteMessage(string data)
    {
        return Message.CreateMessage(MessageVersion.Default, "fault", data);
    }

    protected override void OnAbort()
    {
        throw new NotImplementedException();
    }

    protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    protected override void OnClose(TimeSpan timeout)
    {

    }

    protected override void OnEndClose(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    protected override void OnEndOpen(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    protected override void OnOpen(TimeSpan timeout)
    {

    }
}

Finally, we can create our reply channel that will be used to reply to incoming requests. In this example we will use tcpListener to communicate with the client. In this case WCF service will serve as a wrapper for the underlying TCP communication. In the method WaitForRequest we are simply waiting for incoming message and returning true. After that, the method IReplyChannel.ReceiveRequest is triggered. In that method we simply read the message from the TCP client passing it to the request context.

 public class CustomReplyChannel : CustomChannelBase, IReplyChannel
{
    #region privates
    readonly EndpointAddress localAddress;
    readonly object readLock;
    readonly MessageEncoderFactory encoderFactory;
    delegate IReplyChannel AsyncOnAcceptCaller(TimeSpan timeout);
    AsyncOnAcceptCaller asyncOnAcceptCaller;

    TcpListener tcpListener;
    TcpClient tcpClient;
    #endregion

    public EndpointAddress LocalAddress
    {
        get { return this.localAddress; }
    }

    public CustomReplyChannel(ChannelManagerBase manager, MessageEncoderFactory encoderFactory, EndpointAddress address)
        : base(manager, encoderFactory, address)
    {
        this.encoderFactory = encoderFactory;
        this.localAddress = address;
        this.asyncOnAcceptCaller = new AsyncOnAcceptCaller(this.OnAcceptChannel);

        //start listener
        if (this.tcpListener == null)
        {
            this.tcpListener = new TcpListener(address.Uri.Port);
        }

        tcpListener.Start();
    }

    protected IReplyChannel OnAcceptChannel(TimeSpan timeout)
    {
        var address = new EndpointAddress(this.localAddress.Uri);

        return new CustomReplyChannel(this.Manager, this.encoderFactory, address);
    }

    #region IReplyChannel Members

    IAsyncResult IReplyChannel.BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    IAsyncResult IReplyChannel.BeginReceiveRequest(AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    IAsyncResult IReplyChannel.BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
    {
        return asyncOnAcceptCaller.BeginInvoke(timeout, callback, state);
    }

    IAsyncResult IReplyChannel.BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    RequestContext IReplyChannel.EndReceiveRequest(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    bool IReplyChannel.EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
    {
        context = ((IReplyChannel)this).ReceiveRequest(DefaultReceiveTimeout);
        return true;
    }

    bool IReplyChannel.EndWaitForRequest(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    EndpointAddress IReplyChannel.LocalAddress
    {
        get { throw new NotImplementedException(); }
    }

    RequestContext IReplyChannel.ReceiveRequest(TimeSpan timeout)
    {
        ThrowIfDisposedOrNotOpen();

        try
        {
            var clientStream = tcpClient.GetStream();

            var message = new byte[4096];
            var enc = new ASCIIEncoding();

            var bytesRead = clientStream.Read(message, 0, 4096);
            var response = enc.GetString(message, 0, bytesRead);

            var buffer = enc.GetBytes("Message received");

            clientStream.Write(buffer, 0, buffer.Length);
            clientStream.Flush();

            return new CustomRequestContext(this.ReadMessage(response), this);
        }
        catch (SocketException ex)
        {
            return new CustomRequestContext(this.ReadMessage(ex.Message), this);
        }
        catch (Exception ex)
        {
            return new CustomRequestContext(this.ReadMessage(ex.Message), this);
        }
    }

    RequestContext IReplyChannel.ReceiveRequest()
    {
        throw new NotImplementedException();
    }

    bool IReplyChannel.TryReceiveRequest(TimeSpan timeout, out RequestContext context)
    {
        throw new NotImplementedException();
    }

    bool IReplyChannel.WaitForRequest(TimeSpan timeout)
    {
        ThrowIfDisposedOrNotOpen();

        //blocks until a client has sent a message
        this.tcpClient = tcpListener.AcceptTcpClient();

        return true;
    }

    #endregion

    #region IChannel Members

    T IChannel.GetProperty<T>()
    {
        T channel = encoderFactory.Encoder.GetProperty<T>();
        if (channel != null)
        {
            return channel;
        }

        if (typeof(T) == typeof(MessageVersion))
        {
            return (T)(object)encoderFactory.Encoder.MessageVersion;
        }

        return base.GetProperty<T>();
    }

    #endregion

    #region ICommunicationObject Members

    void ICommunicationObject.Abort()
    {
        throw new NotImplementedException();
    }

    IAsyncResult ICommunicationObject.BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    IAsyncResult ICommunicationObject.BeginClose(AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    IAsyncResult ICommunicationObject.BeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    IAsyncResult ICommunicationObject.BeginOpen(AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    void ICommunicationObject.Close()
    {
        throw new NotImplementedException();
    }

    event EventHandler ICommunicationObject.Closed
    {
        add { throw new NotImplementedException(); }
        remove { throw new NotImplementedException(); }
    }

    event EventHandler ICommunicationObject.Closing
    {
        add { throw new NotImplementedException(); }
        remove { throw new NotImplementedException(); }
    }

    void ICommunicationObject.EndClose(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    void ICommunicationObject.EndOpen(IAsyncResult result)
    {
        throw new NotImplementedException();
    }

    event EventHandler ICommunicationObject.Faulted
    {
        add { throw new NotImplementedException(); }
        remove { throw new NotImplementedException(); }
    }

    event EventHandler ICommunicationObject.Opened
    {
        add { throw new NotImplementedException(); }
        remove { throw new NotImplementedException(); }
    }

    event EventHandler ICommunicationObject.Opening
    {
        add { throw new NotImplementedException(); }
        remove { throw new NotImplementedException(); }
    }

    CommunicationState ICommunicationObject.State
    {
        get { throw new NotImplementedException(); }
    }

    #endregion
}

The request channel is quite straightforward as it simply sends messages using TCP client. We are also reading reply to confirm that our message has been delivered.

 public class CustomRequestChannel : CustomChannelBase, IRequestChannel
 {
    readonly Uri via;

    public Uri Via
    {
        get { return this.via; }
    }

    public CustomRequestChannel(ChannelManagerBase manager, MessageEncoderFactory encoderFactory, EndpointAddress address, Uri via)
        : base(manager, encoderFactory, address)
    {
        this.via = via;
    }

    public Message Request(Message message, TimeSpan timeout)
    {
        ThrowIfDisposedOrNotOpen();

        try
        {
            using (var tcpClient = new TcpClient(this.Via.Host, this.Via.Port))
            {
                var clientStream = tcpClient.GetStream();

                tcpClient.ReceiveTimeout = timeout.Seconds;
                tcpClient.SendTimeout = timeout.Seconds;

                var msg = new byte[4096];
                var encoder = new ASCIIEncoding();

                //get string value from message body
                var messageBody = message.GetBody<string>();
                var buffer = encoder.GetBytes(messageBody);

                //send message
                clientStream.Write(buffer, 0, buffer.Length);
                clientStream.Flush();

                //read reply
                var bytesRead = clientStream.Read(msg, 0, 4096);
                var response = encoder.GetString(msg, 0, bytesRead);

                return this.ReadMessage(response);
            }
        }
        catch (SocketException ex)
        {
            return this.ReadMessage(ex.Message);
        }
        catch (Exception ex)
        {
            return this.ReadMessage(ex.Message);
        }
    }

    public Message Request(Message message)
    {
        return this.Request(message, DefaultReceiveTimeout);
    }

    public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
    {
        throw new NotImplementedException();
    }

    public Message EndRequest(IAsyncResult result)
    {
        throw new NotImplementedException();
    }
}

Finally we can test our server service

 static void StartService()
    {
        var binding = new WCFCustomChannelLib.CustomBinding();
        var uri = new Uri("myProtocol://localhost:81/x");

        var listener = binding.BuildChannelListener<IReplyChannel>(uri, new BindingParameterCollection());

        listener.Open(TimeSpan.FromSeconds(5));

        Console.Write("Creating channel...");
        Console.Write(Environment.NewLine);

        var channel = listener.AcceptChannel(TimeSpan.FromSeconds(5));
        channel.Open(TimeSpan.FromSeconds(5));

        Console.Write("Waiting for request...");
        Console.Write(Environment.NewLine);

        while (channel.WaitForRequest(TimeSpan.FromMinutes(1)))
        {
            using (var context = channel.ReceiveRequest(TimeSpan.FromSeconds(5)))
            {
                using (var message = context.RequestMessage)
                {
                    Console.WriteLine("Received message: {0}", message.GetBody<string>());
                    Console.Write(Environment.NewLine);

                    var replyMessage = Message.CreateMessage(MessageVersion.Default, "fault", "Message received");

                    context.Reply(replyMessage, TimeSpan.FromSeconds(5));
                }
            }
            Console.Write("Waiting for request...");
            Console.Write(Environment.NewLine + "------------------" + Environment.NewLine);
        }

        Console.WriteLine("terminated");
        channel.Close(TimeSpan.FromSeconds(5));
    }
}

And the client service

 static void Main(string[] args)
    {
        Console.Write("Creating factory...");

        var binding = new WCFCustomChannelLib.CustomBinding();

        var factory = binding.BuildChannelFactory<IRequestChannel>();
        factory.Open(TimeSpan.FromSeconds(5));

        Console.Write(Environment.NewLine);
        Console.Write("Creating channel...");

        var uri = new Uri("myProtocol://localhost:81/x");
        var channel = factory.CreateChannel(new EndpointAddress(uri));

        Console.Write(Environment.NewLine);
        Console.Write("Opening channel...");
        Console.Write(Environment.NewLine);

        channel.Open(TimeSpan.FromSeconds(5));
           
        while (true)
        {
            Console.Write("Enter message text: ");
            Console.Write(Environment.NewLine + "------------------" + Environment.NewLine);

            var messageText = Console.ReadLine();
            if (messageText == null) { break; }

            Console.Write("Sending request...");
            Console.Write(Environment.NewLine);

            var requestMessage = Message.CreateMessage(MessageVersion.Default, "fault", messageText);
            var replyMessage = channel.Request(requestMessage, TimeSpan.FromSeconds(5));
               
            using (replyMessage)
            {
                if (replyMessage != null)
                {
                    Console.WriteLine("Reply: {0}", replyMessage.GetBody<string>());
                }
            }
        }

        channel.Close(TimeSpan.FromSeconds(5));
        factory.Close();
    }
}

I have included working sample that you can download and test on your own. Feel free to play with it 🙂

WCFCustomChannel

1 Star2 Stars3 Stars4 Stars5 Stars (No Ratings Yet)
Loading...Loading...