From f5f3b51eeef05d97c3c7fbbdfc77745c9ddf2ccf Mon Sep 17 00:00:00 2001 From: mrfry Date: Fri, 18 Dec 2020 21:32:43 +0100 Subject: [PATCH] Added worker pools, and very very basic workers --- package-lock.json | 5 ++ package.json | 1 + src/server.ts | 14 +++- src/utils/classes.ts | 55 ++++++++++++++-- src/utils/workerPool.ts | 141 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 211 insertions(+), 5 deletions(-) create mode 100644 src/utils/workerPool.ts diff --git a/package-lock.json b/package-lock.json index b4b0311..fe6709a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1466,6 +1466,11 @@ "wide-align": "^1.1.0" } }, + "generic-pool": { + "version": "3.7.1", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.7.1.tgz", + "integrity": "sha512-ug6DAZoNgWm6q5KhPFA+hzXfBLFQu5sTXxPpv44DmE0A2g+CiHoq9LTVdkXpZMkYVMoGw83F6W+WT0h0MFMK/w==" + }, "getpass": { "version": "0.1.7", "resolved": "https://registry.npmjs.org/getpass/-/getpass-0.1.7.tgz", diff --git a/package.json b/package.json index 0ff5dce..c9b0c2c 100755 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "eslint-plugin-typescript": "^0.14.0", "express": "^4.6.1", "express-ejs-layouts": "^1.1.0", + "generic-pool": "^3.7.1", "sqlite3": "^4.1.1", "ts-node": "^9.0.0", "typescript": "^4.1.2", diff --git a/src/server.ts b/src/server.ts index 22572b0..4af8f2a 100755 --- a/src/server.ts +++ b/src/server.ts @@ -290,4 +290,16 @@ if (certsLoaded) { logger.Log('Https not avaible') } -// app.listen(port) +console.log('hai') +import { init, doALongTask } from './utils/workerPool' +init() +setTimeout(() => { + doALongTask(1, { + type: 'work', + add: 1, + index: 1, + }).then((res) => { + console.log('woohoo!') + console.log(res) + }) +}, 6000) diff --git a/src/utils/classes.ts b/src/utils/classes.ts index f07a8fa..c2a51f3 100755 --- a/src/utils/classes.ts +++ b/src/utils/classes.ts @@ -598,11 +598,58 @@ const workerTs = (file: string, wkOpts: any) => { ) } -if (!isMainThread) { - logger.DebugLog(`Starting search worker ...`, 'searchworker', 1) - const { data, subjName, question, questionData } = workerData - searchWorker(data, subjName, question, questionData) +// if (!isMainThread) { +// logger.DebugLog(`Starting search worker ...`, 'searchworker', 1) +// const { data, subjName, question, questionData } = workerData +// searchWorker(data, subjName, question, questionData) +// } + +function random(min, max) { + return Math.floor(Math.random() * (max - min) + min) } + +if (!isMainThread) { + const workerIndex = workerData.workerIndex + const timeoutMin = workerData.workerTimeoutMin + const timeoutMax = workerData.workerTimeoutMax + const data = { val: 0 } + + console.log( + `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty` + ) + console.log(`[THREAD #${workerIndex}]: data`, workerData) + // parentPort.postMessage('hello parent port') + + parentPort.on('message', (msg) => { + // console.log(`[THREAD #${workerIndex}]: onmsg`, msg) + + if (msg.type === 'work') { + const index = msg.index + console.log(`[THREAD #${workerIndex}]: staring work on ${index}`) + setTimeout(() => { + data.val = data.val + msg.add + parentPort.postMessage({ + msg: `From thread #${workerIndex}: job ${index} done`, + workerIndex: workerIndex, + result: data.val, + }) + + console.log(`[THREAD #${workerIndex}]: Work ${index} done!`) + }, random(timeoutMin, timeoutMax)) + } else if (msg.type === 'update') { + if (msg.data.workerIndex !== workerIndex) { + console.log(`[THREAD #${workerIndex}]: update`, msg.data) + console.log( + `[THREAD #${workerIndex}]: From ${data.val} to ${msg.data.result}` + ) + data.val = msg.data.result + } + } + }) +} else { + console.log('[THREAD]: Main thread!') +} + // ------------------------------------------------------------------------ export { diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts new file mode 100644 index 0000000..095868e --- /dev/null +++ b/src/utils/workerPool.ts @@ -0,0 +1,141 @@ +import { Worker } from 'worker_threads' +import genericPool from 'generic-pool' +import os from 'os' + +// --------------------------------------------------------------------------- + +const workerFile = './src/utils/classes.ts' +const workerTimeoutMin = 5 * 1000 +const workerTimeoutMax = 10 * 1000 +let pool: any = null +let workers: any = null + +// --------------------------------------------------------------------------- + +// doALongTask(i, taskObj).then((res) => { +// msgAll(res) +// console.log('[MSGFROMCLIENT]', res) +// }) + +export function doALongTask(i: Number, obj: any): Promise { + return new Promise((resolve) => { + pool + .acquire() + .then(function(client) { + doSomething(client, obj).then((res) => { + resolve(res) + // TODO: check if result is really a result, and want to release port + pool.release(client) + console.log('[RELEASE]: #' + client.index) + }) + }) + .catch(function(err) { + console.log('resourcePromise error') + console.log(err) + // handle error - this is generally a timeout or maxWaitingClients + // error + }) + }) +} + +export function init(): void { + if (workers && pool) { + console.log('WORKER AND POOL ALREADY EXISTS') + return + } + workers = [] + const factory = { + create: function() { + const worker = getAWorker(workers.length) + workers.push(worker) + return { + worker: worker, + index: workers.length, + } + }, + destroy: function(client) { + console.log('[DESTROY]') + client.worker.terminate() + console.log('[DESTROYED] #' + client.index) + }, + } + + const opts = { + min: os.cpus().length - 1, // minimum size of the pool + max: os.cpus().length - 1, // maximum size of the pool + maxWaitingClients: 999, + } + + pool = genericPool.createPool(factory, opts) +} + +export function msgAll(data: any): void { + workers.forEach((worker) => { + worker.postMessage({ + type: 'update', + data, + }) + }) +} + +// --------------------------------------------------------------------------- + +function getAWorker(i) { + const worker = workerTs(workerFile, { + workerData: { + workerIndex: i, + workerTimeoutMin, + workerTimeoutMax, + }, + }) + + worker.setMaxListeners(50) + + // worker.on('message', (msg) => { + // console.log(`[MAIN]: Msg from worker #${i}`, msg) + // }) + + worker.on('online', () => { + console.log(`[THREAD #${i}]: Worker ${i} online`) + }) + + worker.on('error', (err) => { + console.log(err) + }) + + worker.on('exit', (code) => { + console.log(`[MAIN]: worker #${i} exit code: `, code) + }) + return worker +} + +// --------------------------------------------------------------------------- + +function doSomething(client, obj) { + const { index, worker } = client + return new Promise((resolve) => { + console.log('[ACCUIRE]: #' + index) + worker.postMessage(obj) + worker.once('message', (msg) => { + resolve(msg) + }) + }) +} + +const workerTs = (file: string, wkOpts: any) => { + wkOpts.eval = true + if (!wkOpts.workerData) { + wkOpts.workerData = {} + } + wkOpts.workerData.__filename = file + return new Worker( + ` + const wk = require('worker_threads'); + require('ts-node').register(); + let file = wk.workerData.__filename; + delete wk.workerData.__filename; + require(file); + `, + wkOpts + ) +}