.NET的异步消息SignalR和Rx(Reactive编程)

14-07-15 banq
              

SIGNALR是基于微软.NET的消息模型,主要是PUB/SUB类型,有消息生产者和消息消费者。Rx是Reactive框架,使用SignalR和Rx能够实现服务器向客户端的PUSH推送功能。

使用LinQ的问题是堵塞,如下图:

public interface IEnumerator<T>
{
   T Current { get; }
   bool MoveNext();
   void Reset();
}

<p>

而使用Rx能克服这一问题,Rx是一种组合异步和基于事件的使用可观察集合的无堵塞库包。

下面是Rx Push服务端代码:

Dim rand = New Random(Now.Millisecond)
Dim Generator = Observable.Generate(Of Double, SensorData)(
    initialState:=0,
    condition:=Function(val) True,
    iterate:=Function(val) rand.NextDouble,
    resultSelector:=Function(val) New SensorData With
        {.Time = Now, .Value = val * 20, .Category = (CInt(val * 4)).ToString() },
    timeSelector:=Function(val) TimeSpan.FromSeconds(val))

Dim query = From val In Generator
            Where val.Value > 10
            Select val

query.Subscribe(Sub(value)
        Dim context = GlobalHost.ConnectionManager.
                                 GetHubContext(Of ObservableSensorHub)()
        context.Clients.Broadcast(value)
    End Sub)

<p>

Push客户端代码:

Dim cn = New HubConnection("http://localhost:5687")
Dim sensor = cn.CreateProxy("observableSensorHub")

Dim items = From item In sensor.Observe("broadcast")
            Let instance = item(0).ToObject(Of SensorData)()
            Where instance.Category = "1"
            Select instance

Using items.Subscribe(Sub(value) Console.WriteLine(value.Value))
        cn.Start().Wait()
        Console.ReadLine()
End Using

<p>

详细PPT下载

案例开源代码

更多介绍点击标题进入英文网页。

[该贴被banq于2014-07-15 08:32修改过]