Node.js中线程的完整指南 – LogRocket

19-03-25 banq
         

很多人想知道单线程Node.js如何与多线程后端竞争。因此,考虑到其所谓的单线程特性,许多大公司选择Node作为其后端似乎违反直觉。 当我们说Node是单线程时,我们必须理解我们的真正含义。

通常支持多线程的后端语言具有各种机制,用于在线程和其他面向线程的功能之间同步值。要向JavaScript添加对此类内容的支持,需要更改整个语言,这不是Dahl的目标。对于支持多线程的纯JavaScript,他必须创建一个变通方法。让我们来探索......

Node.js如何真正起作用

Node.js使用两种线程:由事件循环处理的主线程和工作池中的几个辅助线程。

事件循环是一种机制,它采用回调(函数)并将它们注册为将来的某个时刻执行。它与正确的JavaScript代码在同一个线程中运行。当JavaScript操作阻塞线程时,事件循环也会被阻止。

工作池是一种执行模型,它生成并处理单独的线程,然后同步执行任务并将结果返回到事件循环。然后,事件循环使用所述结果执行提供的回调。

简而言之,它负责异步I / O操作 - 主要是与系统磁盘和网络的交互。它主要用于fs(I / O-heavy)或crypto(CPU-heavy)等模块。工作池是在libuv中实现的,每当Node需要在JavaScript和C ++之间进行内部通信时,这会导致轻微的延迟,但这几乎不可察觉。

使用这两种机制,我们可以编写如下代码:

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }

 console.log(content.toString());
});

上述fs模块告诉工作池使用其中一个线程来读取文件的内容,并在完成后通知事件循环。然后事件循环获取提供的回调函数并使用文件的内容执行这个回调函数。

以上是非阻塞代码的示例; 因此,我们不必同步等待某事发生。我们告诉工作池读取文件并使用结果调用提供的函数。由于工作池有自己的线程,因此事件循环可以在读取文件时继续正常执行。

在需要同步执行某些复杂操作之前,这一切都很好:任何运行时间太长的函数都会阻塞线程。如果应用程序具有许多此类功能,则可能会显着降低服务器的吞吐量或完全冻结它。在这种情况下,无法将工作委派给工作池。

需要复杂计算的字段(例如AI,机器学习或大数据)无法真正有效地使用Node.js,因为操作阻塞了主(且唯一)线程,使服务器无响应。在Node.js v10.5.0发布之前就是这种情况,这增加了对多线程的支持。

介绍:worker_threads

worker_threads模块是一个包,允许我们创建功能齐全的多线程Node.js应用程序。

线程工作者worker是在单独的线程中生成的一段代码。

请注意,术语线程工作者,工作者和线程通常可以互换使用; 他们都指的是同一件事。

要开始使用线程工作者,我们必须导入worker_threads模块。让我们首先创建一个函数来帮助我们生成这些线程工作者,然后我们将讨论它们的属性。

type WorkerCallback = (err: any, result?: any) => any;

export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });

 worker.on('message', cb.bind(null, null));
 worker.on('error', cb);

 worker.on('exit', (exitCode) => {
   if (exitCode === 0) {
     return null;
   }

   return cb(new Error(`Worker has stopped with code ${exitCode}`));
 });

 return worker;
}

要创建一个worker,我们必须创建一个Worker类的实例。在第一个参数中,我们提供了包含worker的代码的文件的路径; 在第二个中,我们提供一个包含一个名为的属性的对象workerData。这是我们希望线程在开始运行时可以访问的数据。

请注意,无论您是使用JavaScript本身还是使用转换为JavaScript的内容(例如,TypeScript),路径都应始终引用带有扩展名.js或  .mjs扩展名的文件  。

我还想指出为什么我们使用回调方法而不是在message事件被触发时返回promise 。这是因为工人worker可以派遣许多message活动,而不仅仅是一个。

正如您在上面的示例中所看到的,线程之间的通信是基于事件的,这意味着我们正在设置在工作者发送给定事件后调用的侦听器。

以下是最常见的事件:

worker.on('error', (error) => {});

error表示:只要工作者中有未捕获的异常,就会发出该事件。然后终止worker,并且错误可用作提供的回调中的第一个参数。

worker.on('exit',(exitCode)=> {});

exit表示:每当工人退出时就会发出。如果process.exit()在worker内部调用,exitCode则会提供给回调。如果工作人员被终止worker.terminate(),则代码为1。

worker.on('online',()=> {});

online表示:每当工作程序停止解析JavaScript代码并开始执行时就会发出。它不经常使用,但在特定情况下可以提供信息。

worker.on('message',(data)=> {});

message :每当工作人员向父线程发送数据时都会发出。

现在让我们来看看如何在线程之间共享数据。

在线程之间交换数据

要将数据发送到其他线程,我们使用该port.postMessage()方法。它有以下签名:

port.postMessage(data[, transferList])

端口对象可以是一个parentPort或一个实例MessagePort

数据参数

第一个参数 - 这里称为data - 是一个复制到另一个线程的对象。它可以包含复制算法支持的任何内容。

数据由结构化克隆算法复制

它通过递归输入对象来构建克隆,同时保持先前访问过的引用的映射,以避免无限遍历循环。

算法不复制函数、错误、属性描述符或原型链。还应该注意,以这种方式复制对象与使用JSON不同,因为它可以包含循环引用和类型化数组,例如,而JSON不能。

通过支持复制类型化数组,该算法可以在线程之间共享内存。

在线程之间共享内存

很久以前,人们可能认为模块喜欢cluster或启用了线程。

cluster模块可以创建多个节点实例,其中一个主进程在它们之间路由传入请求。集群应用程序使我们能够有效地增加服务器的吞吐量; 但是,我们不能用cluster模块生成单独的线程。

child_process无论是否是JavaScript,该模块都可以生成任何可执行文件。它非常相似,但它缺少几个重要的功能worker_threads。

具体来说,线程工作者更轻量级并且与其父线程共享相同的进程ID。它们还可以与父线程共享内存,这样可以避免序列化大的数据负载,从而更有效地来回发送数据。

现在让我们看一下如何在线程之间共享内存的示例。为了使存储器被共享,实例ArrayBuffer或SharedArrayBuffer必须被发送到其它线程作为数据参数或数据参数的内部。

这是一个与其父线程共享内存的worker:

import { parentPort } from 'worker_threads';

parentPort.on('message', () => {
 const numberOfElements = 100;
 const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
 const arr = new Int32Array(sharedBuffer);

 for (let i = 0; i < numberOfElements; i += 1) {
   arr[i] = Math.round(Math.random() * 30);
 }

 parentPort.postMessage({ arr });
});

首先,我们创建一个SharedArrayBuffer包含100个32位整数所需的内存。接下来,我们创建一个实例Int32Array,它将使用缓冲区来保存其结构,然后我们只用一些随机数填充数组并将其发送到父线程。

在父线程中:

import { runWorker } from '../run-worker';

const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
 if (err) {
   return null;
 }

 arr[0] = 5;
});

worker.postMessage({});

通过更改arr[0]为5,我们实际上在两个线程中更改它。

当然,通过共享内存,我们冒险在一个线程中更改一个值并在另一个线程中更改它。但是我们在此过程中也获得了一个非常好的功能:该值不需要序列化以便在另一个线程中可用,这极大地提高了效率。只需记住正确管理数据引用,以便在完成数据处理后对其进行垃圾回收。

共享一个整数数组很好,但我们真正感兴趣的是共享对象 - 存储信息的默认方式。不幸的是,没有SharedObjectBuffer 或类似,但我们可以自己创建一个类似的结构

transferList参数

transferList只能包含ArrayBuffer和MessagePort。一旦它们被转移到另一个线程,它们就不能再用于发送线程; 内存被移动到另一个线程,因此在发送一个线程中不可用。

目前,我们不能通过将网络套接字包含在transferList(我们可以使用child_process模块)中来传输网络套接字。

创建通信通道

线程之间的通信是通过端口进行的,端口是MessagePort类的实例并启用基于事件的通信。

有两种方法可以使用端口在线程之间进行通信。第一个是默认值,两个更容易。在worker的代码中,我们导入一个parentPort从worker_threads模块调用的对象,并使用该对象的  .postMessage()方法将消息发送到父线程。这是一个例子:

import { parentPort } from 'worker_threads';
const data = {
 // ...
};

parentPort.postMessage(data);

parentPort是MessagePort在幕后为我们创建的Node.js 的实例,用于启用与父线程的通信。这样,我们可以通过使用parentPort和worker对象在线程之间进行通信。

线程之间通信的第二种方式是实际创建一个MessageChannel我们自己的并将其发送给worker。以下是我们如何创建一个新的MessagePort并与我们的工作者分享它:

import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';

const worker = new Worker(path.join(__dirname, 'worker.js'));

const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => {
 console.log('message from worker:', message);
});

worker.postMessage({ port: port2 }, [port2]);

创建port1和port2后,我们就成立了事件侦听器port1和发送port2给工人。我们必须把它包含在transferList它转移到工人方面。

现在,在工人内部:

import { parentPort, MessagePort } from 'worker_threads';

parentPort.on('message', (data) => {
 const { port }: { port: MessagePort } = data;

 port.postMessage('heres your message!');
});

这样,我们使用父线程发送的端口。

使用parentPort不一定是错误的方法,但最好MessagePort使用实例创建一个新实例,MessageChannel然后与生成的工作者共享它(阅读:关注点分离)。

请注意,在下面的示例中,我parentPort用来保持简单。

更多点击标题见原文。