在本教程中,我想向您展示如何使用RSocket WebSocket和Spring Boot创建实时应用程序。
我们的应用程序有2个端点。
- 数字流
- 每当服务器收到数字“ N”时,它将以1秒的延迟发出从1到N的数字。因此,对于每个请求,服务器都可以通过单个WebSocket连接发送多个响应。
- 数字频道
- 这是双向流。也就是说,当用户输入数字时,该数字将被发送到后端,该后端将计算给定数字的平方,然后进行响应。
注意: 此示例可能看起来非常简单。在这里,我们正在学习客户端与服务器之间的实时通信及其工作方式。如果我们能够实现这一目标,则可以将其用于YouTube / Netflix等应用,以获取实时视频/电影推荐。概念是一样的!
后端实现
满足我们要求的rsocket服务器端实现如下所示!Spring Boot在这里完成了所有繁重的工作。
@Controller public class RSocketController {
@MessageMapping("number.stream") public Flux<Integer> responseStream(Integer number) { return Flux.range(1, number) .delayElements(Duration.ofSeconds(1)); }
@MessageMapping("number.channel") public Flux<Long> biDirectionalStream(Flux<Long> numberFlux) { return numberFlux .map(n -> n * n) .onErrorReturn(-1L); }
}
|
端口和映射路径可以是任何东西。默认情况下,传输是TCP。我们需要将传输作为WebSocket添加,以使其明确作为WebSocket服务器。spring.rsocket.server.port=6565 spring.rsocket.server.transport=websocket spring.rsocket.server.mapping-path=/rsocket
|
前端实现
需要做一些客户端通信。我们不能仅通过使用普通JavaScript来轻松实现这一目标。我们必须引入一些Node.js模块。
- 添加依赖项,如下所示。我们只需要此依赖项rsocket-websocket-client。为了开发目的,我们几乎不需要其他依赖项。
{ "name": "rsocket-websocker-client", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1", "build": "webpack --mode=development", "serve": "webpack-dev-server --open" }, "author": "", "license": "ISC", "devDependencies": { "clean-webpack-plugin": "^3.0.0", "html-webpack-plugin": "^4.3.0", "webpack": "^4.44.1", "webpack-cli": "^3.3.12", "webpack-dev-server": "^3.11.0" }, "dependencies": { "rsocket-websocket-client": "0.0.19" } }
|
const path = require('path'); const HtmlWebpackPlugin = require('html-webpack-plugin');
module.exports = { mode: 'development', devServer: { port: 8081, contentBase: './dist', }, entry: './src/index.js', output: { filename: 'bundle.js', path: path.resolve(__dirname, 'dist'), publicPath: '/' }, plugins: [ new HtmlWebpackPlugin({ inlineSource: '.(js|css)$', template: __dirname + `/src/index.html`, filename: __dirname + `/dist/index.html`, inject: 'head', }) ] }
|
<body> <div class="container mt-3"> <h1>RSocket-WebSocket Demo</h1> <div class="row"> <div class="col"> <p class="font-weight-light mt-3">Enter number:</p> <input type="text" class="form-control" id="n"> </div> </div> <div class="row"> <ul class="list-group mt-5 pl-2" id="result"></ul> </div> </div> </body>
|
- 首先,我们导入特定的依赖项。
- 然后我们创建rsocket客户
- 我们有一个响应和错误处理程序
- 我们向后端发出请求,并通过订阅来处理响应
- 建立websocket连接后,我们将注册侦听器。
import { RSocketClient, JsonSerializer, IdentitySerializer } from 'rsocket-core'; import RSocketWebSocketClient from 'rsocket-websocket-client';
// backend ws endpoint const wsURL = 'ws://localhost:6565/rsocket';
// rsocket client const client = new RSocketClient({ serializers: { data: JsonSerializer, metadata: IdentitySerializer }, setup: { keepAlive: 60000, lifetime: 180000, dataMimeType: 'application/json', metadataMimeType: 'message/x.rsocket.routing.v0', }, transport: new RSocketWebSocketClient({ url: wsURL }) });
// error handler const errorHanlder = (e) => console.log(e); // response handler const responseHanlder = (payload) => { const li = document.createElement('li'); li.innerText = payload.data; li.classList.add('list-group-item', 'small') document.getElementById('result').appendChild(li); }
// request to rsocket-websocket and response handling const numberRequester = (socket, value) => { socket.requestStream({ data: value, metadata: String.fromCharCode('number.stream'.length) + 'number.stream' }).subscribe({ onError: errorHanlder, onNext: responseHanlder, onSubscribe: subscription => { subscription.request(100); // set it to some max value } }) }
// once the backend connection is established, register the event listeners client.connect().then(socket => { document.getElementById('n').addEventListener('change', ({srcElement}) => { numberRequester(socket, parseInt(srcElement.value)); }) }, errorHanlder);
|
基于WebSocket的RSocket流
我们来看看实现这种双向流所需的更改。
- 我们介绍了FlowableProcessor,它将同时充当发布者和订阅者。前端将发布将由后端应用程序订阅的值。
- 这里的想法是–每当我输入数字时,键入事件就会发布值。所以我立即得到给定数字的平方。
// reactive stream processor const processor = new FlowableProcessor(sub => {});
const numberRequester = (socket, processor) => { socket.requestChannel(processor.map(i => { return { data: i, metadata: String.fromCharCode('number.channel'.length) + 'number.channel' } })).subscribe({ onError: errorHanlder, onNext: responseHanlder, onSubscribe: subscription => { subscription.request(100); // set it to some max value } }) }
client.connect().then(sock => { numberRequester(sock, processor); document.getElementById('n').addEventListener('keyup', ({srcElement}) => { if(srcElement.value.length > 0){ processor.onNext(parseInt(srcElement.value)) } }) }, errorHanlder);
|
结论
我们能够成功建立RSocket WebSocket客户端-服务器通信,并以此开发一个简单的应用程序。我们可以将其用于实时视频推荐,游戏应用程序等。
此演示的源代码在此处。