Removed generic-pool, implemented own

This commit is contained in:
mrfry 2021-04-16 11:52:33 +02:00
parent a6ee415c5b
commit 999b11c3ec
3 changed files with 3653 additions and 73 deletions

3581
package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -13,7 +13,6 @@
"eslint-plugin-typescript": "^0.14.0", "eslint-plugin-typescript": "^0.14.0",
"express": "^4.6.1", "express": "^4.6.1",
"express-ejs-layouts": "^1.1.0", "express-ejs-layouts": "^1.1.0",
"generic-pool": "^3.7.1",
"sqlite3": "^4.1.1", "sqlite3": "^4.1.1",
"ts-node": "^9.0.0", "ts-node": "^9.0.0",
"typescript": "^4.1.2", "typescript": "^4.1.2",

View file

@ -1,5 +1,5 @@
import { Worker } from 'worker_threads' import { Worker } from 'worker_threads'
import genericPool from 'generic-pool' import { v4 as uuidv4 } from 'uuid'
import os from 'os' import os from 'os'
import logger from './logger' import logger from './logger'
@ -7,17 +7,25 @@ import logger from './logger'
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
interface WorkerObj {
worker: any
index: number
free: Boolean
}
const jobCheckInterval = 500
const alertOnPendingCount = 10 const alertOnPendingCount = 10
const workerFile = './src/utils/classes.ts' const workerFile = './src/utils/classes.ts'
let pool: any = null let workers: Array<WorkerObj>
let workers: any = null const pendingJobs = {}
const completedJobs = {}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function handleWorkerError(client, err) { function handleWorkerError(worker: WorkerObj, err) {
// TODO
logger.Log('resourcePromise error', logger.GetColor('redbg')) logger.Log('resourcePromise error', logger.GetColor('redbg'))
console.error(err) console.error(err)
pool.destroy(client)
} }
// TODO: accuire all workers here, and handle errors so they can be removed if threads exit // TODO: accuire all workers here, and handle errors so they can be removed if threads exit
@ -26,11 +34,11 @@ export function msgAllWorker(data: any): Promise<any> {
return new Promise((resolve) => { return new Promise((resolve) => {
const promises = [] const promises = []
workers.forEach((worker) => { workers.forEach((worker) => {
worker.postMessage(data) worker.worker.postMessage(data)
console.log('MSGD') console.log('MSGD')
promises.push( promises.push(
new Promise((resolve) => { new Promise((resolve) => {
worker.once('message', (msg) => { worker.worker.once('message', (msg) => {
console.log(worker.index, 'ONCE MESSASGE RESOLVE') console.log(worker.index, 'ONCE MESSASGE RESOLVE')
resolve(msg) resolve(msg)
}) })
@ -45,51 +53,37 @@ export function msgAllWorker(data: any): Promise<any> {
} }
export function doALongTask(obj: any): Promise<any> { export function doALongTask(obj: any): Promise<any> {
if (pool.pending > alertOnPendingCount) { if (Object.keys(pendingJobs).length > alertOnPendingCount) {
logger.Log( logger.Log(
`More than ${alertOnPendingCount} callers waiting for free resource! (${pool.pending})`, `More than ${alertOnPendingCount} callers waiting for free resource! (${
Object.keys(pendingJobs).length
})`,
logger.GetColor('redbg') logger.GetColor('redbg')
) )
} }
const jobId = uuidv4()
pendingJobs[jobId] = obj
return new Promise((resolve) => { return new Promise((resolve) => {
let currClient const interval = setInterval(() => {
pool if (completedJobs[jobId] && completedJobs[jobId].done) {
.acquire() resolve(completedJobs[jobId].result)
.then(function(client) { delete completedJobs[jobId]
currClient = client clearInterval(interval)
doSomething(client, obj) }
.then((res) => { }, jobCheckInterval)
resolve(res)
// TODO: check if result is really a result, and want to release port
pool.release(client)
// console.log('[RELEASE]: #' + client.index)
})
.catch(function(err) {
handleWorkerError(currClient, err)
})
})
.catch(function(err) {
handleWorkerError(currClient, err)
})
}) })
} }
export function initWorkerPool(initData: any): void { export function initWorkerPool(initData: any): void {
if (workers && pool) { if (workers) {
logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg')) logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg'))
return return
} }
workers = [] workers = []
const factory = { const factory = {
create: function() { create: function(index) {
const currInd = workers.length return getAWorker(index, initData)
const worker = getAWorker(currInd, initData)
workers.push(worker)
return {
worker: worker,
index: currInd,
}
}, },
destroy: function(client) { destroy: function(client) {
// console.log('[DESTROY]') // console.log('[DESTROY]')
@ -98,25 +92,68 @@ export function initWorkerPool(initData: any): void {
}, },
} }
const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length - 1 const workerCount = process.env.NS_THREAD_COUNT || os.cpus().length - 1
if (process.env.NS_THREAD_COUNT) { if (process.env.NS_WORKER_COUNT) {
logger.Log( logger.Log(
`Setting thread count from enviroment variable NS_THREAD_COUNT: '${threadCount}'`, `Setting thread count from enviroment variable NS_WORKER_COUNT: '${workerCount}'`,
logger.GetColor('red') logger.GetColor('red')
) )
} }
const opts = { for (let i = 0; i < workerCount; i++) {
min: threadCount, // minimum size of the pool workers.push({
max: threadCount, // maximum size of the pool worker: factory.create(i),
maxWaitingClients: 999, index: i,
free: true,
})
} }
pool = genericPool.createPool(factory, opts) // TODO: stop this interval sometime
setInterval(() => {
if (Object.keys(pendingJobs).length > 0) {
const freeWorker = workers.find((worker) => {
return worker.free
})
if (!freeWorker) {
return
}
if (freeWorker.free) {
freeWorker.free = false
}
// FIXME: FIFO OR ANYTHING ELSE (JOB PROCESSING ORDER)
const workKey = Object.keys(pendingJobs)[0]
const work = pendingJobs[workKey]
completedJobs[workKey] = { done: false }
delete pendingJobs[workKey]
work.inProgress = true
processJob(work, freeWorker).then((res) => {
completedJobs[workKey].done = true
completedJobs[workKey].result = res
})
}
}, jobCheckInterval)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function processJob(work, worker) {
return new Promise((resolve) => {
doSomething(worker, work)
.then((res) => {
setTimeout(() => {
resolve(res)
// console.log('[RELEASE]: #' + client.index)
worker.free = true
}, 6000)
})
.catch(function(err) {
handleWorkerError(worker, err)
})
})
}
function getAWorker(i, initData) { function getAWorker(i, initData) {
const worker = workerTs(workerFile, { const worker = workerTs(workerFile, {
workerData: { workerData: {
@ -127,15 +164,6 @@ function getAWorker(i, initData) {
worker.setMaxListeners(50) worker.setMaxListeners(50)
// worker.on('message', (msg) => {
// logger.Log(`[MAIN]: Msg from worker #${i}`)
// logger.Log(msg)
// })
// worker.on('online', () => {
// logger.Log(`[THREAD #${i}]: Worker ${i} online`)
// })
worker.on('error', (err) => { worker.on('error', (err) => {
logger.Log('Worker error!', logger.GetColor('redbg')) logger.Log('Worker error!', logger.GetColor('redbg'))
console.error(err) console.error(err)
@ -153,14 +181,14 @@ function getAWorker(i, initData) {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function doSomething(client, obj) { function doSomething(currWorker, obj) {
const { /* index, */ worker } = client const { /* index, */ worker } = currWorker
return new Promise((resolve) => { return new Promise((resolve) => {
// console.log('[ACCUIRE]: #' + index) // console.log('[ACCUIRE]: #' + index)
worker.postMessage(obj)
worker.once('message', (msg) => { worker.once('message', (msg) => {
resolve(msg) resolve(msg)
}) })
worker.postMessage(obj)
}) })
} }