Java、Rust、Go、NodeJS、TypeScript并发编程比较 - foojay


使用Java、Rust、Go、JavaScript (NodeJS)、TypeScript 等流行语言构建并发 Web 服务器并对其进行基准测试(Deno) 和 Kotlin 来比较这些语言/平台之间的并发性及其性能。
  
Rust 中的并发
高效和内存安全的并发是 Rust 的主要目标之一,这些不仅仅是简单的词,该语言为并发编程提供了强大的功能,当与同类最佳的内存安全模型相结合时,使其成为并发用例的绝佳选择。
Rust 提供构建块来创建和管理操作系统线程作为标准库的一部分,它还提供使用通道的消息传并发(类似于 Go)和使用互斥体和智能指针的共享状态并发所需的实现。Rust 的类型系统和所有权模型有助于避免常见的并发问题,如数据竞争、锁等。
最新版本的 Rust 提供了使用async/.await语法进行异步编程所需的构建块和语言功能。但请记住,使用异步编程模型会增加整体复杂性,而且生态系统仍在不断发展。虽然 Rust 提供了所需的语言功能,但标准库不提供任何所需的实现,因此您必须使用外部 crateFutures才能有效地使用异步编程模型。
带有 Tokio 的异步多线程并发网络服务器:这是另一个使用Tokio的异步多线程网络服务器版本,由Remco Bloemen贡献。为简洁起见,我省略了导入语句。您可以在GitHub 上找到完整示例。

#[tokio::main()] // Tokio uses a threadpool sized for number of cpus by default
async fn main() {
    let listener = TcpListener::bind(
"127.0.0.1:8080").await.unwrap();  // bind listener
    let mut count = 0;
// count used to introduce delays

   
// Listen for an incoming connection.
    loop {
        count = count + 1;
        let (socket, _) = listener.accept().await.unwrap();
       
// spawning each connection in a new tokio thread asynchronously
        tokio::spawn(async move { handle_connection(socket, Box::new(count)).await });
    }
}

async fn handle_connection(mut stream: TcpStream, count: Box<i64>) {
   
// Read the first 1024 bytes of data from the stream
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

   
// add 2 second delay to every 10th request
    if (*count % 10) == 0 {
        println!(
"Adding delay. Count: {}", count);
        sleep(Duration::from_secs(2)).await;
    }

    let header =
"
    HTTP/1.0 200 OK
    Connection: keep-alive
    Content-Length: 174
    Content-Type: text/html; charset=utf-8
       
";

    let contents = read_to_string(
"hello.html").await.unwrap();

    let response = format!(
"{}\r\n\r\n{}", header, contents);

    stream.write_all(response.as_bytes()).await.unwrap();
// write response
}

Threadpool除了异步调用之外,也有来自线程池的相同瓶颈,因此我们将线程池设置为 100 以匹配最大并发请求。
让我们使用 ApacheBench 运行一个基准测试。我们将发出 10000 个请求和 100 个并发请求。

ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Document Path:          /
Document Length:        176 bytes

Concurrency Level:      100
Time taken for tests:   20.569 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      3030000 bytes
HTML transferred:       1760000 bytes
Requests per second:    486.17 [#/sec] (mean)
Time per request:       205.688 [ms] (mean)
Time per request:       2.057 [ms] (mean, across all concurrent requests)
Transfer rate:          143.86 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   2.4      0      22
Processing:     0  202 600.3      1    2013
Waiting:        0  202 600.3      1    2012
Total:          0  203 600.3      2    2029

Percentage of the requests served within a certain time (ms)
  50%      2
  66%      3
  75%      5
  80%      7
  90%   2000
  95%   2003
  98%   2006
  99%   2008
 100%   2029 (longest request)

 
Java 中的并发
这个例子更接近Rust语言的异步例子,为了简洁我省略了 import 语句。您可以在GitHub 上找到完整示例。请注意,我们在java.nio.channels.AsynchronousServerSocketChannel这里使用并且没有外部依赖项。

public class JavaAsyncHTTPServer {
    public static void main(String[] args) throws Exception {
        new JavaAsyncHTTPServer().start();
        Thread.currentThread().join(); // Wait forever
    }

    private void start() throws IOException {
       
// we shouldn't use try with resource here as it will kill the stream
        var server = AsynchronousServerSocketChannel.open();
        server.bind(new InetSocketAddress(
"127.0.0.1", 8080), 100); // bind listener
        server.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        System.out.println(
"Server is listening on port 8080");

        final int[] count = {0};
// count used to introduce delays

       
// listen to all incoming requests
        server.accept(null, new CompletionHandler<>() {
            @Override
            public void completed(final AsynchronousSocketChannel result, final Object attachment) {
                if (server.isOpen()) {
                    server.accept(null, this);
                }
                count[0]++;
                handleAcceptConnection(result, count[0]);
            }

            @Override
            public void failed(final Throwable exc, final Object attachment) {
                if (server.isOpen()) {
                    server.accept(null, this);
                    System.out.println(
"Connection handler error: " + exc);
                }
            }
        });
    }

    private void handleAcceptConnection(final AsynchronousSocketChannel ch, final int count) {
        var file = new File(
"hello.html");
        try (var fileIn = new FileInputStream(file)) {
           
// add 2 second delay to every 10th request
            if (count % 10 == 0) {
                System.out.println(
"Adding delay. Count: " + count);
                Thread.sleep(2000);
            }
            if (ch != null && ch.isOpen()) {
               
// Read the first 1024 bytes of data from the stream
                final ByteBuffer buffer = ByteBuffer.allocate(1024);
               
// read the request fully to avoid connection reset errors
                ch.read(buffer).get();

               
// read the HTML file
                var fileLength = (int) file.length();
                var fileData = new byte[fileLength];
                fileIn.read(fileData);

               
// send HTTP Headers
                var message = (
"HTTP/1.1 200 OK\n" +
                       
"Connection: keep-alive\n" +
                       
"Content-length: " + fileLength + "\n" +
                       
"Content-Type: text/html; charset=utf-8\r\n\r\n" +
                        new String(fileData, StandardCharsets.UTF_8)
                ).getBytes();

               
// write the to output stream
                ch.write(ByteBuffer.wrap(message)).get();

                buffer.clear();
                ch.close();
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            System.out.println(
"Connection handler error: " + e);
        }
    }
}

我们将异步侦听器绑定到端口 8080 并侦听所有传入请求。每个请求都在由 AsynchronousServerSocketChannel提供的新任务中处理。我们在这里没有使用任何线程池,所有传入的请求都是异步处理的,因此我们没有最大连接数的瓶颈。
让我们使用 ApacheBench 运行一个基准测试。我们将发出 10000 个请求和 100 个并发请求。

ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Document Path:          /
Document Length:        176 bytes

Concurrency Level:      100
Time taken for tests:   20.243 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      2770000 bytes
HTML transferred:       1760000 bytes
Requests per second:    494.00 [#/sec] (mean)
Time per request:       202.431 [ms] (mean)
Time per request:       2.024 [ms] (mean, across all concurrent requests)
Transfer rate:          133.63 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.6      0       5
Processing:     0  201 600.0      0    2026
Waiting:        0  201 600.0      0    2026
Total:          0  202 600.0      0    2026

Percentage of the requests served within a certain time (ms)
  50%      0
  66%      1
  75%      3
  80%      4
  90%   2000
  95%   2001
  98%   2002
  99%   2003
 100%   2026 (longest request)

 

Go 中的并发
不要通过共享内存进行通信;相反,通过通信共享内存。Go 支持并发作为一等公民,其goroutines. Go 将协程的概念提升到一个全新的水平,使其更简单,并且成为在 Go 中执行几乎任何事情的首选方式。语义和语法非常简单,即使是 Go 新手也能从一开始就goroutines轻松上手。所有这一切都没有牺牲性能。
为简洁起见,我省略了导入语句。您可以在GitHub 上找到完整示例。在这种情况下,我们也没有使用任何外部依赖项,并且http是 Go 标准库的一部分。

func main() {
    var count = 0
    // set router
    http.HandleFunc(
"/", func(w http.ResponseWriter, r *http.Request) {
        defer r.Body.Close()
        count++
        handleConnection(w, count)
    })
    
// set listen port
    err := http.ListenAndServe(
":8080", nil)
    if err != nil {
        log.Fatal(
"ListenAndServe: ", err)
    }
}

func handleConnection(w http.ResponseWriter, count int) {
    
// add 2 second delay to every 10th request
    if (count % 10) == 0 {
        println(
"Adding delay. Count: ", count)
        time.Sleep(2 * time.Second)
    }
    html, _ := ioutil.ReadFile(
"hello.html") // read html file
    w.Header().Add(
"Connection", "keep-alive")
    w.WriteHeader(200)          
// 200 OK
    fmt.Fprintf(w, string(html))
// send data to client side
}

如您所见,我们创建了一个绑定到端口 8080 的 HTTP 服务器并侦听所有传入请求。我们分配一个回调函数来处理内部调用handleConnection方法的每个请求。
让我们使用 ApacheBench 运行一个基准测试。我们将发出 10000 个请求和 100 个并发请求。

ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Document Path:          /
Document Length:        174 bytes

Concurrency Level:      100
Time taken for tests:   20.232 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      2910000 bytes
HTML transferred:       1740000 bytes
Requests per second:    494.27 [#/sec] (mean)
Time per request:       202.319 [ms] (mean)
Time per request:       2.023 [ms] (mean, across all concurrent requests)
Transfer rate:          140.46 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   0.9      0       6
Processing:     0  201 600.0      1    2013
Waiting:        0  201 600.0      0    2013
Total:          0  202 600.0      1    2018
WARNING: The median and mean for the initial connection time are not within a normal deviation
        These results are probably not that reliable.

Percentage of the requests served within a certain time (ms)
  50%      1
  66%      1
  75%      2
  80%      3
  90%   2000
  95%   2001
  98%   2002
  99%   2003
 100%   2018 (longest request)

 
JavaScript 和 NodeJS 中的并发
JavaScript 是单线程的,因此实现多线程的唯一方法是启动 JS 引擎的多个实例。但是,您如何在这些实例之间进行通信?这就是Web Workers 的用武之地。
Web Workers 使在与 Web 应用程序的主执行线程分离的后台线程中运行脚本操作成为可能
在 Web Worker 的帮助下,可以将繁重的计算卸载到单独的线程,从而释放主线程。这些工作线程和主线程使用事件进行通信,一个工作线程可以产生其他工作线程。
现在,当谈到 NodeJS 时,几乎没有方法可以产生额外的线程和进程。有经典child_process模块,更现代的worker_threads模块,与 Web Worker 非常相似,以及cluster用于创建 NodeJS 实例集群的模块。
无论是 web worker 还是 worker 线程,它们都不像其他语言中的多线程实现那样灵活或简单,并且有很多限制,因此它们大多只在有 CPU 密集型任务或后台任务需要执行以供其他用途时使用使用异步处理的并发情况就足够了。
JavaScript 不提供对 OS 线程或绿色线程的访问,同样适用于 NodeJS 但是工作线程和集群很接近,因此高级多线程是不可行的。消息传递并发是可能的,由 JS 事件循环本身使用,可用于 JS 中的 Worker 和标准并发模型。在标准并发模型和使用数组缓冲区的工作线程中,共享状态并发是可能的。
我们使用cluster模块来分叉主线程和工作线程,每个 CPU 线程一个工作线程。我们仍然在http这里使用模块和回调。您可以在GitHub 上找到完整示例。在这种情况下,我们也没有使用任何外部依赖。

const http = require("http");
const fs = require(
"fs").promises;
const cluster = require(
"cluster");
const numCPUs = require(
"os").cpus().length;

let count = 0;

// set router
const server = http.createServer((req, res) => {
  count++;
  requestListener(req, res, count);
});

const host =
"localhost";
const port = 8080;

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);

 
// Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on(
"exit", (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
 
// set listen port, TCP connection is shared by all workers
  server.listen(port, host, () => {
    console.log(`Worker ${process.pid}: Server is running on http:
//${host}:${port}`);
  });
}

const requestListener = async function (req, res, count) {
 
// add 2 second delay to every 10th request
  if (count % 10 === 0) {
    console.log(
"Adding delay. Count: ", count);
    await sleep(2000);
  }
  const contents = await fs.readFile(__dirname +
"/hello.html"); // read html file
  res.setHeader(
"Connection", "keep-alive");
  res.writeHead(200);
// 200 OK
  res.end(contents);
// send data to client side
};

// sleep function since NodeJS doesn't provide one
function sleep(ms) {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
}

集群模块分为 master 和 worker。我们分配一个回调函数来处理内部调用该requestListener方法的每个请求。
让我们使用 ApacheBench 运行一个基准测试。我们将发出 10000 个请求和 100 个并发请求。
ab -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Server Software:
Server Hostname:        127.0.0.1
Server Port:            8080

Document Path:          /
Document Length:        174 bytes

Concurrency Level:      100
Time taken for tests:   21.075 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      2540000 bytes
HTML transferred:       1740000 bytes
Requests per second:    474.50 [#/sec] (mean)
Time per request:       210.747 [ms] (mean)
Time per request:       2.107 [ms] (mean, across all concurrent requests)
Transfer rate:          117.70 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.8      0      11
Processing:     0  206 600.1      4    2047
Waiting:        0  205 600.1      3    2045
Total:          1  206 600.1      4    2047

Percentage of the requests served within a certain time (ms)
  50%      4
  66%      8
  75%     11
  80%     14
  90%     88
  95%   2005
  98%   2012
  99%   2016
 100%   2047 (longest request)

  
Deno 中的并发
TypeScript 中的并发性与 JavaScript 中的完全相同,因为 TypeScript 是 JavaScript 的严格超集。
因此,如果您将 TypeScript 与 NodeJS 一起使用,它与在 NodeJS 上使用 JavaScript 完全相同,因为 NodeJS 不会在本地运行 TypeScript,我们必须将其转换为 JavaScript,因此让我们专注于 Deno 上的 TypeScript,因为我们已经涵盖了 NodeJS。
与 NodeJS 不同,Deno 可以在本地运行 TypeScript,它会在幕后转换为 JS。正如我们在 NodeJS 中看到的,Deno 还专注于非阻塞 IO,旨在改进/修复 NodeJS 中的问题。这意味着你也可以在 Deno 上使用 NodeJS 和 JavaScript 完成所有可以做的事情,有时使用更好的 API 和更少的代码。就像在 JS 中一样,您依靠事件循环回调承诺Async/Await来实现 TypeScript 中的并发。
Deno API 默认是异步的,并且推荐经常使用 async/await 。
Deno 中的默认并发是使用回调、Promise 或 async/await 的异步编程模型。
就像在 JavaScript 中一样,也可以在 Deno 上使用 TypeScript 进行某种程度的多线程并发和并行化,并且由于 Deno 是基于 Rust 构建的,因此未来并发性能可能会比NodeJS 上的更好。
这个例子更接近Rust 异步例子。您可以在此处GitHub 上找到完整示例。在这种情况下,我们仅使用标准 Deno 模块。

import { serve, ServerRequest } from "https://deno.land/std/http/server.ts";

let count = 0;

// set listen port
const server = serve({ hostname:
"0.0.0.0", port: 8080 });
console.log(`HTTP webserver running at:  http:
//localhost:8080/`);

// listen to all incoming requests
for await (const request of server) handleRequest(request);

async function handleRequest(request: ServerRequest) {
  count++;
 
// add 2 second delay to every 10th request
  if (count % 10 === 0) {
    console.log(
"Adding delay. Count: ", count);
    await sleep(2000);
  }
 
// read html file
  const body = await Deno.readTextFile(
"./hello.html");
  const res = {
    status: 200,
    body,
    headers: new Headers(),
  };
  res.headers.set(
"Connection", "keep-alive");
  request.respond(res);
// send data to client side
}

// sleep function since NodeJS doesn't provide one
function sleep(ms: number) {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
}

我们创建了一个 HTTP 服务器并将其绑定到端口 8080 并在 for await 循环中侦听所有传入请求。每个请求都在内部使用async/await函数处理。
让我们使用 ApacheBench 运行一个基准测试。我们将发出 10000 个请求和 100 个并发请求。
ab -k -c 100 -n 10000 http://127.0.0.1:8080/

This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
...

Server Software:
Server Hostname:        127.0.0.1
Server Port:            8080

Document Path:          /
Document Length:        174 bytes

Concurrency Level:      100
Time taken for tests:   21.160 seconds
Complete requests:      10000
Failed requests:        0
Keep-Alive requests:    10000
Total transferred:      2380000 bytes
HTML transferred:       1740000 bytes
Requests per second:    472.59 [#/sec] (mean)
Time per request:       211.600 [ms] (mean)
Time per request:       2.116 [ms] (mean, across all concurrent requests)
Transfer rate:          109.84 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.7      0      11
Processing:     0  207 600.7      5    2250
Waiting:        0  207 600.7      5    2250
Total:          0  207 600.7      5    2254

Percentage of the requests served within a certain time (ms)
  50%      5
  66%      8
  75%     11
  80%     13
  90%   2001
  95%   2006
  98%   2012
  99%   2017
 100%   2254 (longest request)