Sunday, December 13, 2015

SERIAL COMMUNICATION USING REACTIVE EXTENSIONS

The main purpose of Reactive Extensions (Rx) is to enable processing event streams. I set out to use Reactive Extensions for receiving data on the serial port and learned some new things.
Here is the code that uses Observable.FromEventPattern<T>() to create an IObservable<T>from the .NET event SerialPort.DataReceivedEvent:
IObservable<byte> serialPortSource = Observable.FromEventPattern<
SerialDataReceivedEventHandler,
SerialDataReceivedEventArgs>
(
handler => _serialPort.DataReceived += handler,
handler => _serialPort.DataReceived -= handler
).SelectMany(_ =>
{
var buffer = new byte[1024];
var ret = new List<byte>();
do
{
int bytesRead = _serialPort.Read(buffer, 0, buffer.Length);
ret.AddRange(buffer.Take(bytesRead));
} while (bytesRead >= buffer.Length);
return ret;
});
view rawfirstattempt.cs hosted with ❤ by GitHub
The event does not actually contain any information on the data received, it only indicates that there is data available. Reading the data is done inside the lambda expression. Reading serial data will return a list of bytes. This list may contain a complete message or just a part of a message or even multiple messages. To handle this, I want the observable to be an IObservable<byte>, i.e., it will produce a raw stream of bytes without any indication of where a message begins or ends. This is done through the extension method public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) that is used to flatten the sequence returned by the lambda.
So I now have a stream of bytes. I want these bytes to be chunked into messages. For my particular protocol, messages are separated by a special byte. Separation can be done in two ways:
private IObservable<IEnumerable<byte>> ToMessage(IObservable<byte> input)
{
return Observable.Create<IEnumerable<byte>>(observer =>
{
var buffer = new List<byte>();
return input.Subscribe(b =>
{
if (b == _messageDelimiter)
{
observer.OnNext(buffer);
buffer.Clear();
}
else
{
buffer.Add(b);
}
});
});
}
Here, a new observable is created using Observable.Create(). This observable subscribes to the byte stream, collects the data in a local collection and fires OnNext() whenever a message delimiter is encountered.
private class FillingCollection
{
public List<byte> Message { get; set; }
public bool Complete { get; set; }
}
private IObservable<IEnumerable<byte>> ToMessage(IObservable<byte> input)
{
return input.Scan(new FillingCollection { Message = new List<byte>() }, (buffer, newByte) =>
{
if (buffer.Complete)
{
buffer.Message.Clear();
buffer.Complete = false;
}
if (newByte == _messageDelimiter)
{
buffer.Complete = true;
}
else
{
buffer.Message.Add(newByte);
}
return buffer;
}).Where(fc => fc.Complete).Select(fc => fc.Message);
}
This version uses the Scan() operator to achieve the same thing. The output is anIObservable<IEnumerable<byte>> that fires an IEnumerable<byte> for every new message.
This code worked well up until the point I started attaching multiple observers to the message stream, one to process the messages and one to just dump received messages to a debug console. What happened then was that the code in the first code sample was called multiple times: once for each subscriber. This meant that each chunk of serial data was only received by one subscriber, not all subscribers. There are two possible solutions to this: Either introduce a Subject<IEnumerable<byte>> subscribing to serialPortSource and have consumers subscribe to the subject or use the Publish() operator that does the work for you.
IConnectableObservable<IEnumerable<byte>> messageSourceConnectable = ToMessage(serialPortSource).Publish();
IObservable<IEnumerable<byte>> messageSource = messageSourceConnectable;
IDisposable serialPortSubscription = messageSourceConnectable.Connect();
view rawpublish.cs hosted with ❤ by GitHub
Creating a new observable that produces deserialized messages from the observable producing lists of bytes is now trivial using a simple Select().
What remains is the question of how to use the received data in a typical workflow of sending out a message and receiving a response in return. Here is an example:
IConnectableObservable<DeserializedMessage> connectableObservable = DeserializedMessageSource.Replay();
using (connectableObservable.Connect())
{
var messageId = await SendMessageAsync(command).ConfigureAwait(false);
response = await connectableObservable
.Where(message => message.Type == MessageType.Response && message.CorrelatingMessageID == messageId)
.Timeout(TimeSpan.FromSeconds(_timeoutSeconds))
.FirstAsync();
}
view rawsendandreceive.cs hosted with ❤ by GitHub
This example uses the Replay() operator. Replay will capture all events from the observable that are fired after the call to Connect(). After calling Connect() the call is sent to the device at the other end of the serial connection. The second await filters the incoming messages for the desired message (even using a filter criterion that was not known before the request was sent), adds a timeout, uses FirstAsync() to return an observable that only returns the first element followed by OnCompleted(), and waits for that OnCompleted() using await. Since Replay() is capturing all messages, the followingawait call on the observable should consider all answers from the target, whether they are received before or after the second call to await.