使用Node.js + RSocket + WebSocket + Spring Boot实现实时前后端通讯 - vinsguru


在本教程中,我想向您展示如何使用RSocket WebSocket和Spring Boot创建实时应用程序。 
我们的应用程序有2个端点。

  • 数字流
    • 每当服务器收到数字“ N”时,它将以1秒的延迟发出从1到N的数字。因此,对于每个请求,服务器都可以通过单个WebSocket连接发送多个响应。
  • 数字频道
    • 这是双向流。也就是说,当用户输入数字时,该数字将被发送到后端,该后端将计算给定数字的平方,然后进行响应。

注意:  此示例可能看起来非常简单。在这里,我们正在学习客户端与服务器之间的实时通信及其工作方式。如果我们能够实现这一目标,则可以将其用于YouTube / Netflix等应用,以获取实时视频/电影推荐。概念是一样的!
 
后端实现
满足我们要求的rsocket服务器端实现如下所示!Spring Boot在这里完成了所有繁重的工作。
  • 控制器

@Controller
public class RSocketController {

    @MessageMapping("number.stream")
    public Flux<Integer> responseStream(Integer number) {
        return Flux.range(1, number)
                    .delayElements(Duration.ofSeconds(1));
    }

    @MessageMapping(
"number.channel")
    public Flux<Long> biDirectionalStream(Flux<Long> numberFlux) {
        return numberFlux
                .map(n -> n * n)
                .onErrorReturn(-1L);
    }

}

  • application.properties

端口和映射路径可以是任何东西。默认情况下,传输是TCP。我们需要将传输作为WebSocket添加,以使其明确作为WebSocket服务器。

spring.rsocket.server.port=6565
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket

 
前端实现
需要做一些客户端通信。我们不能仅通过使用普通JavaScript来轻松实现这一目标。我们必须引入一些Node.js模块。

  • 通过发出以下命令来创建项目

npm init -y

  • 目结构如下所示。因此,如下所示添加文件。


  • 添加依赖项,如下所示。我们只需要此依赖项rsocket-websocket-client。为了开发目的,我们几乎不需要其他依赖项。

{
  "name": "rsocket-websocker-client",
 
"version": "1.0.0",
 
"description": "",
 
"main": "index.js",
 
"scripts": {
   
"test": "echo \"Error: no test specified\" && exit 1",
   
"build": "webpack --mode=development",
   
"serve": "webpack-dev-server --open"
  },
 
"author": "",
 
"license": "ISC",
 
"devDependencies": {
   
"clean-webpack-plugin": "^3.0.0",
   
"html-webpack-plugin": "^4.3.0",
   
"webpack": "^4.44.1",
   
"webpack-cli": "^3.3.12",
   
"webpack-dev-server": "^3.11.0"
  },
 
"dependencies": {
   
"rsocket-websocket-client": "0.0.19"
  }
}

  • webpack.config.js

const path = require('path');
const HtmlWebpackPlugin = require('html-webpack-plugin');

module.exports = {
    mode: 'development',
    devServer: {
        port: 8081,
        contentBase: './dist',
     },
     entry: './src/index.js',
     output: {
         filename: 'bundle.js',
         path: path.resolve(__dirname, 'dist'),
         publicPath: '/'
     },
     plugins: [
        new HtmlWebpackPlugin({
            inlineSource: '.(js|css)$',
            template: __dirname + `/src/index.html`,
            filename: __dirname + `/dist/index.html`,
            inject: 'head',
          })
     ]
}

  • index.html

<body>
    <div class="container mt-3">
        <h1>RSocket-WebSocket Demo</h1>
        <div class=
"row">
             <div class=
"col">
                <p class=
"font-weight-light mt-3">Enter number:</p> 
                <input type=
"text" class="form-control"  id="n">
            </div>
        </div>
        <div class=
"row">
            <ul class=
"list-group mt-5 pl-2"  id="result"></ul>
        </div>
    </div>
</body>

  •  index.js

  1. 首先,我们导入特定的依赖项。
  2. 然后我们创建rsocket客户
  3. 我们有一个响应和错误处理程序
  4. 我们向后端发出请求,并通过订阅来处理响应
  5. 建立websocket连接后,我们将注册侦听器。

import { RSocketClient, JsonSerializer, IdentitySerializer } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';

// backend ws endpoint
const wsURL = 'ws:
//localhost:6565/rsocket';

// rsocket client
const client = new RSocketClient({
    serializers: {
        data: JsonSerializer,
        metadata: IdentitySerializer
    },
    setup: {
        keepAlive: 60000,
        lifetime: 180000,
        dataMimeType: 'application/json',
        metadataMimeType: 'message/x.rsocket.routing.v0',
    },
    transport: new RSocketWebSocketClient({
        url: wsURL
    })
});

// error handler
const errorHanlder = (e) => console.log(e);
// response handler
const responseHanlder = (payload) => {
    const li = document.createElement('li');
    li.innerText = payload.data;
    li.classList.add('list-group-item', 'small')
    document.getElementById('result').appendChild(li);
}

// request to rsocket-websocket and response handling
const numberRequester = (socket, value) => {
    socket.requestStream({
        data: value,
        metadata: String.fromCharCode('number.stream'.length) + 'number.stream'
    }).subscribe({
        onError: errorHanlder,
        onNext: responseHanlder,
        onSubscribe: subscription => {
            subscription.request(100);
// set it to some max value
        }
    })
}

// once the backend connection is established, register the event listeners
client.connect().then(socket => {
    document.getElementById('n').addEventListener('change', ({srcElement}) => {
        numberRequester(socket, parseInt(srcElement.value));
    })
}, errorHanlder);
 

基于WebSocket的RSocket流
我们来看看实现这种双向流所需的更改。

  • 我们介绍了FlowableProcessor,它将同时充当发布者和订阅者。前端将发布将由后端应用程序订阅的值。
  • 这里的想法是–每当我输入数字时,键入事件就会发布值。所以我立即得到给定数字的平方。

// reactive stream processor
const processor = new FlowableProcessor(sub => {});

const numberRequester = (socket, processor) => {
    socket.requestChannel(processor.map(i => {
        return {
            data: i,
            metadata: String.fromCharCode('number.channel'.length) + 'number.channel'
        }
    })).subscribe({
        onError: errorHanlder,
        onNext: responseHanlder,
        onSubscribe: subscription => {
            subscription.request(100);
// set it to some max value
        }
    })
}

client.connect().then(sock => {
    numberRequester(sock, processor);
    document.getElementById('n').addEventListener('keyup', ({srcElement}) => {
        if(srcElement.value.length > 0){
            processor.onNext(parseInt(srcElement.value))
        }
    })
}, errorHanlder);

 
结论
我们能够成功建立RSocket WebSocket客户端-服务器通信,并以此开发一个简单的应用程序。我们可以将其用于实时视频推荐,游戏应用程序等。
此演示的源代码在此处