怎样在小程序里实现标题的更改
666
2022-11-05
Nodejs中的递归多线程工作进程
Example
The following replicates the above worker graph. See here for a JavaScript version.
// app.tsimport { spawn, Main, Thread, channel, Sender, Receiver } from '@sinclair/threadbox'@Thread() class WorkerC { run() { return Math.random() }}@Thread() class WorkerB { async run(sender: Sender) { const c_0 = spawn(WorkerC) const c_1 = spawn(WorkerC) const c_2 = spawn(WorkerC) const c_3 = spawn(WorkerC) const [a, b, c, d] = await Promise.all([ c_0.run(), c_1.run(), c_2.run(), c_3.run(), ]) await sender.send([a, b, c, d]) await sender.end() await c_0.dispose() await c_1.dispose() await c_2.dispose() await c_3.dispose() }}@Thread() class WorkerA { async run(receiver: Receiver) { for await(const [a, b, c, d] of receiver) { } }}// start here ...@Main() default class { async main() { const [sender, receiver] = channel() const a = spawn(WorkerA) const b = spawn(WorkerB) await Promise.all([ a.run(receiver), b.run(sender) ]) await a.dispose() await b.dispose() }}
Overview
ThreadBox is a threading library for JavaScript that is built on top of NodeJS worker_threads. It is written to allow for compute intensive JavaScript or WASM processes to be trivially executed in remote worker threads. ThreadBox approaches this by allowing any class decorated with Thread to be spawned and instanced in a remote worker thread. ThreadBox constructs an async interface to the remote worker class, allowing the host thread to invoke operations on the remote worker through simple async method calls.
ThreadBox uses a recursive pattern to spawn worker threads. ThreadBox will recursively call into the applications entry module (typically app.js) and instance a requested Thread class when spawning a new worker. Because each new worker is spawned from the same entry module as the application, class, function and const definitions defined by the application are also available to each subsequent thread. This approach allows for ergonomic same file threading common to other languages.
ThreadBox was built as a research project and is primarily geared towards TypeScript development. It does however provide a non-decorator based fallback API for JavaScript users. This library is offered as is to anyone who may find it of use.
Built with Node 12.16.1 LTS and TypeScript 3.8.3.
Licence MIT
Install
$ npm install @sinclair/threadbox --save
Contents
InstallOverviewSetupMainThreadSpawnChannelMarshalMutexSharedArrayBuffer
Setup
ThreadBox primarily uses decorators to denote Main, Thread and Marshal class types. It also implements [Symbol.asyncIterator] for channels. TypeScript users should configure their environment for the following.
// tsconfig.json{ "compilerOptions": { "experimentalDecorators": true, "downlevelIteration": true, ... }}
Main
The Main decorator registers a class as the programs entry point. The classes main(...) function will be automatically called by ThreadBox when the program is run.
import { Main } from '@sinclair/threadbox'@Main() class Program { main() { console.log('Hello World') }}// JavaScript users can use __Main(Program) if// decorators are not available.
Thread
The Thread decorator registers a class as threadable which allows it to be spawned in a worker thread. When spawned, the host thread will be able to execute any function available on the class. The class may additionally implement an optional dispose() function that will be invoked when the host thread terminates the worker.
import { Thread } from '@sinclair/threadbox'@Thread() class Worker { add(a: number, b: number) { return a + b } dispose() { console.log('disposed!') }}@Main() default class { async main() { const worker = spawn(Worker) const result = await worker.add(10, 20) await worker.dispose() }}// JavaScript users can use __Thread(Worker) if// decorators are not available.
All JavaScript classes can be decorated with @Thread().
Spawn
The spawn(...) function will spawn a threadable class and return a handle to caller allowing it to call methods on the remote class instance. The spawn(...) function may also accept the classes constructor arguments.
import { spawn, Main, Worker } from '@sinclair/threadbox'@Thread() class Worker { constructor(private message: string) { console.log('worker: constructor', message) } method() { console.log('worker: method') } dispose() { console.log('worker: dispose') }}@Main() default class { async main() { const worker = spawn(Worker, 'hello world') await worker.method() await worker.dispose() // important! }}
The return type of the spawn() function is a ThreadInterface object. This ThreadInterface is a promisfied version of the class interface. The ThreadInterface also provides an additional dispose() function that is available irrespective of if the class has provided an implementation. Calling dispose() on the ThreadInterface will result in the worker being terminated.
Channel
ThreadBox provides a channel API that is built upon the MessageChannel API. ThreadBox channels implement a synchronization protocol that enables a Sender to optionally await for messages to be received by a Receiver. ThreadBox channels are loosely modelled on Rust mpsc channels.
import { channel } from '@sinclair/threadbox'const [sender, receiver] = channel()
The channel Sender and Receiver types can be used to stream sequences of values between cooperating threads. The Sender will async buffer values if the caller does not await on send(). The Receiver type implements [Symbol.asyncIterator] so can be used with for-await-of.
Example 1
The following creates a channel inside the Main thread and sends the Sender to the worker thread. The worker thread will emit values to the Sender which are iterated on within the Main thread.
import { spawn, Main, Worker, channel, Sender, Receiver } from '@sinclair/threadbox'@Thread() class Worker { async execute(sender: Sender
Example 2
The following creates a channel inside the worker thread and returns a Receiver on its stream() function. The Main thread then spawns the worker thread and calls stream() and awaits for the Receiver. It then iterates on the Receiver. The into() function is a util function that allows one to move into an async context.
import { spawn, into, Main, Worker, channel, Sender, Receiver } from '@sinclair/threadbox'@Thread() class Worker { stream(): Receiver
Marshal
The Marshal decorator registers a class as marshalled. This enables instances of the class to be sent and reconstructed across thread boundaries. ThreadBox will automatically marshal all classes marked with Marshal across Thread function calls, as well as across a channel Sender.
This functionality allows class instances to be transferred to remote threads for remote invocation.
import { spawn, Main, Thread, Marshal } from '@sinclair/threadbox'// Instances of this class can be sent between threads.@Marshal() class Transferrable { method() { console.log('Hello World') }}@Thread() class Worker { execute(instance: Transferrable) { instance.method() // callable }}@Main() default class { async main() { const worker = spawn(Worker) const instance = new Transferrable() await worker.execute(instance) await worker.dispose() }}// JavaScript users can use __Marshal(Foo) if// decorators are not available.
Note: There is a serialization cost to marshaling. For performance, only Marshal when you need to move logic in and out of threads.
Mutex
ThreadBox provides a Mutex primitive that is built upon JavaScript Atomics. It is used to enter into critical sections of code.
import { Mutex } from '@sinclair/threadbox'const mutex = new Mutex()const lock = mutex.lock()// critical sectionlock.dispose()
The example below spawns 4 instances of the Worker class. A Mutex instance is passed into each worker where by the worker takes a MutexLock on the execute() method. The worker thread holds onto their respective lock for 1 second before releasing. Only 1 of the 4 workers will execute the critical section (below) at one time. The timeout is used to demonstrate the locking behavior.
import { spawn, Main, Thread, Mutex } from '@sinclair/threadbox'@Thread() class Worker { execute(mutex: Mutex) { const lock = mutex.lock() // // critical section !! // setTimeout(() => lock.dispose(), 1000) }}@Main() default class { async main() { const worker_0 = spawn(Worker) const worker_1 = spawn(Worker) const worker_2 = spawn(Worker) const worker_3 = spawn(Worker) const mutex = new Mutex() await Promise.all([ worker_0.execute(mutex), worker_1.execute(mutex), worker_2.execute(mutex), worker_3.execute(mutex) ]) // .. 4 seconds approx await worker_0.dispose() await worker_1.dispose() await worker_2.dispose() await worker_3.dispose() }}
SharedArrayBuffer
The following demonstrates using SharedArrayBuffer to parallelize operations performed across a shared Float32Array. The shared buffer is sent to 4 workers with an index to store the result.
import { spawn, Main, Worker } from '@sinclair/threadbox'@Thread() class ComputeForIndex { execute(buffer: Float32Array, index: number) { // sleep 5 seconds const started = Date.now() while((Date.now() - started) < 5000) {} buffer[index] = Math.random() }}@Main() default class { async main() { // 4 x 32bit floats const shared = new SharedArrayBuffer(4 * Float32Array.BYTES_PER_ELEMENT) const buffer = new Float32Array(shared) // spin up 4 workers const c_0 = spawn(ComputeForIndex) const c_1 = spawn(ComputeForIndex) const c_2 = spawn(ComputeForIndex) const c_3 = spawn(ComputeForIndex) // process in parallel await Promise.all([ c_0.execute(buffer, 0), c_1.execute(buffer, 1), c_2.execute(buffer, 2), c_3.execute(buffer, 3) ]) // clean up await c_0.dispose() await c_1.dispose() await c_2.dispose() await c_3.dispose() // result console.log('result', buffer) }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~