.NET的异步消息SignalR和Rx(Reactive编程)

SIGNALR是基于微软.NET的消息模型,主要是PUB/SUB类型,有消息生产者和消息消费者。Rx是Reactive框架,使用SignalR和Rx能够实现服务器向客户端的PUSH推送功能。

使用LinQ的问题是堵塞,如下图:


public interface IEnumerator<T>
{
   T Current { get; }
   bool MoveNext();
   void Reset();
}


而使用Rx能克服这一问题,Rx是一种组合异步和基于事件的使用可观察集合的无堵塞库包。

下面是Rx Push服务端代码:


Dim rand = New Random(Now.Millisecond)
Dim Generator = Observable.Generate(Of Double, SensorData)(
initialState:=0,
condition:=Function(val) True,
iterate:=Function(val) rand.NextDouble,
resultSelector:=Function(val) New SensorData With
       {.Time = Now, .Value = val * 20, .Category = (CInt(val * 4)).ToString() },
timeSelector:=Function(val) TimeSpan.FromSeconds(val))

Dim query = From val In Generator
            Where val.Value > 10
            Select val

query.Subscribe(Sub(value)
     Dim context = GlobalHost.ConnectionManager.
                                GetHubContext(Of ObservableSensorHub)()
   context.Clients.Broadcast(value)
    End Sub)


Push客户端代码:


Dim cn = New HubConnection("http://localhost:5687")
Dim sensor = cn.CreateProxy(
"observableSensorHub")

Dim items = From item In sensor.Observe(
"broadcast")
            Let instance = item(0).ToObject(Of SensorData)()
            Where instance.Category = 
"1"
            Select instance

Using items.Subscribe(Sub(value) Console.WriteLine(value.Value))
       cn.Start().Wait()
        Console.ReadLine()
End Using

详细PPT下载
案例开源代码

更多介绍点击标题进入英文网页。

[该贴被banq于2014-07-15 08:32修改过]