Files
mrfrys-node-server/src/utils/workerPool.ts
T
2020-12-19 10:32:15 +01:00

143 lines
3.4 KiB
TypeScript

import { Worker } from 'worker_threads'
import genericPool from 'generic-pool'
import os from 'os'
import logger from './logger'
// import { QuestionDb } from '../types/basicTypes'
// ---------------------------------------------------------------------------
const workerFile = './src/utils/classes.ts'
let pool: any = null
let workers: any = null
// ---------------------------------------------------------------------------
// doALongTask(i, taskObj).then((res) => {
// msgAll(res)
// console.log('[MSGFROMCLIENT]', res)
// })
export function doALongTask(obj: any): Promise<any> {
return new Promise((resolve) => {
pool
.acquire()
.then(function(client) {
doSomething(client, obj).then((res) => {
resolve(res)
// TODO: check if result is really a result, and want to release port
pool.release(client)
console.log('[RELEASE]: #' + client.index)
})
})
.catch(function(err) {
logger.Log('resourcePromise error', logger.GetColor('redbg'))
console.error(err)
// handle error - this is generally a timeout or maxWaitingClients
// error
})
})
}
export function initWorkerPool(): void {
if (workers && pool) {
logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg'))
return
}
workers = []
const factory = {
create: function() {
const currInd = workers.length
const worker = getAWorker(currInd)
workers.push(worker)
return {
worker: worker,
index: currInd,
}
},
destroy: function(client) {
console.log('[DESTROY]')
client.worker.terminate()
console.log('[DESTROYED] #' + client.index)
},
}
const opts = {
min: os.cpus().length - 1, // minimum size of the pool
max: os.cpus().length - 1, // maximum size of the pool
maxWaitingClients: 999,
}
pool = genericPool.createPool(factory, opts)
}
export function msgAll(data: any): void {
workers.forEach((worker) => {
worker.postMessage({
type: 'update',
data,
})
})
}
// ---------------------------------------------------------------------------
function getAWorker(i) {
const worker = workerTs(workerFile, {
workerData: {
workerIndex: i,
},
})
worker.setMaxListeners(50)
// worker.on('message', (msg) => {
// logger.Log(`[MAIN]: Msg from worker #${i}`, msg)
// })
worker.on('online', () => {
logger.Log(`[THREAD #${i}]: Worker ${i} online`)
})
worker.on('error', (err) => {
logger.Log('Worker error!', logger.GetColor('redbg'))
console.error(err)
})
worker.on('exit', (code) => {
logger.Log(`[MAIN]: worker #${i} exit code: ${code}`)
})
return worker
}
// ---------------------------------------------------------------------------
function doSomething(client, obj) {
const { index, worker } = client
return new Promise((resolve) => {
console.log('[ACCUIRE]: #' + index)
worker.postMessage(obj)
worker.once('message', (msg) => {
resolve(msg)
})
})
}
const workerTs = (file: string, wkOpts: any) => {
wkOpts.eval = true
if (!wkOpts.workerData) {
wkOpts.workerData = {}
}
wkOpts.workerData.__filename = file
return new Worker(
`
const wk = require('worker_threads');
require('ts-node').register();
let file = wk.workerData.__filename;
delete wk.workerData.__filename;
require(file);
`,
wkOpts
)
}