Reserving specified workers is now possible

This commit is contained in:
mrfry 2021-04-16 14:13:39 +02:00
parent 999b11c3ec
commit 8ba1b01d33
5 changed files with 345 additions and 83 deletions

View file

@ -1,11 +1,9 @@
import { Worker } from 'worker_threads'
import { v4 as uuidv4 } from 'uuid'
import { EventEmitter } from 'events'
import os from 'os'
import logger from './logger'
// import { QuestionDb } from '../types/basicTypes'
// ---------------------------------------------------------------------------
interface WorkerObj {
worker: any
@ -13,17 +11,33 @@ interface WorkerObj {
free: Boolean
}
const jobCheckInterval = 500
interface PendingJob {
workData: any
doneEvent: any
targetWorkerIndex?: number
}
const alertOnPendingCount = 10
const workerFile = './src/utils/classes.ts'
let workers: Array<WorkerObj>
const pendingJobs = {}
const completedJobs = {}
const pendingJobs: {
[id: string]: PendingJob
} = {}
const jobEvents = new EventEmitter()
jobEvents.on('jobDone', () => {
processJob()
})
jobEvents.on('newJob', () => {
processJob()
})
// ---------------------------------------------------------------------------
function handleWorkerError(worker: WorkerObj, err) {
// TODO
// TODO: restart worker if exited or things like that
logger.Log('resourcePromise error', logger.GetColor('redbg'))
console.error(err)
}
@ -34,25 +48,18 @@ export function msgAllWorker(data: any): Promise<any> {
return new Promise((resolve) => {
const promises = []
workers.forEach((worker) => {
worker.worker.postMessage(data)
console.log('MSGD')
promises.push(
new Promise((resolve) => {
worker.worker.once('message', (msg) => {
console.log(worker.index, 'ONCE MESSASGE RESOLVE')
resolve(msg)
})
})
)
promises.push(doALongTask(data, worker.index))
})
Promise.all(promises).then((res) => {
console.log('MSG ALL DONE', res)
resolve(res)
})
})
}
export function doALongTask(obj: any): Promise<any> {
export function doALongTask(
obj: any,
targetWorkerIndex?: number
): Promise<any> {
if (Object.keys(pendingJobs).length > alertOnPendingCount) {
logger.Log(
`More than ${alertOnPendingCount} callers waiting for free resource! (${
@ -63,15 +70,19 @@ export function doALongTask(obj: any): Promise<any> {
}
const jobId = uuidv4()
pendingJobs[jobId] = obj
// FIXME: delete doneEvent?
const doneEvent = new EventEmitter()
pendingJobs[jobId] = {
workData: obj,
targetWorkerIndex: targetWorkerIndex,
doneEvent: doneEvent,
}
jobEvents.emit('newJob')
return new Promise((resolve) => {
const interval = setInterval(() => {
if (completedJobs[jobId] && completedJobs[jobId].done) {
resolve(completedJobs[jobId].result)
delete completedJobs[jobId]
clearInterval(interval)
}
}, jobCheckInterval)
doneEvent.once('done', (result) => {
jobEvents.emit('jobDone')
resolve(result)
})
})
}
@ -86,9 +97,7 @@ export function initWorkerPool(initData: any): void {
return getAWorker(index, initData)
},
destroy: function(client) {
// console.log('[DESTROY]')
client.worker.terminate()
// console.log('[DESTROYED] #' + client.index)
},
}
@ -107,51 +116,56 @@ export function initWorkerPool(initData: any): void {
free: true,
})
}
// TODO: stop this interval sometime
setInterval(() => {
if (Object.keys(pendingJobs).length > 0) {
const freeWorker = workers.find((worker) => {
return worker.free
})
if (!freeWorker) {
return
}
if (freeWorker.free) {
freeWorker.free = false
}
// FIXME: FIFO OR ANYTHING ELSE (JOB PROCESSING ORDER)
const workKey = Object.keys(pendingJobs)[0]
const work = pendingJobs[workKey]
completedJobs[workKey] = { done: false }
delete pendingJobs[workKey]
work.inProgress = true
processJob(work, freeWorker).then((res) => {
completedJobs[workKey].done = true
completedJobs[workKey].result = res
})
}
}, jobCheckInterval)
}
// ---------------------------------------------------------------------------
function processJob(work, worker) {
return new Promise((resolve) => {
doSomething(worker, work)
function processJob() {
if (Object.keys(pendingJobs).length > 0) {
// FIXME: FIFO OR ANYTHING ELSE (JOB PROCESSING ORDER)
const keys = Object.keys(pendingJobs)
let jobKey, freeWorker
let i = 0
while (!freeWorker && i < keys.length) {
jobKey = keys[i]
if (!isNaN(pendingJobs[jobKey].targetWorkerIndex)) {
if (workers[pendingJobs[jobKey].targetWorkerIndex].free) {
freeWorker = workers[pendingJobs[jobKey].targetWorkerIndex]
console.log(
`RESERVING WORKER ${pendingJobs[jobKey].targetWorkerIndex}`
)
}
} else {
freeWorker = workers.find((worker) => {
return worker.free
})
if (freeWorker) {
console.log(`RESERVING FIRST AVAILABLE WORKER ${freeWorker.index}`)
}
}
i++
}
if (!freeWorker) {
console.log('NO FREE WORKER')
return
}
if (freeWorker.free) {
freeWorker.free = false
}
const job = pendingJobs[jobKey]
delete pendingJobs[jobKey]
doSomething(freeWorker, job.workData)
.then((res) => {
setTimeout(() => {
resolve(res)
// console.log('[RELEASE]: #' + client.index)
worker.free = true
}, 6000)
freeWorker.free = true
job.doneEvent.emit('done', res)
})
.catch(function(err) {
handleWorkerError(worker, err)
handleWorkerError(freeWorker, err)
})
})
}
}
function getAWorker(i, initData) {
@ -184,7 +198,6 @@ function getAWorker(i, initData) {
function doSomething(currWorker, obj) {
const { /* index, */ worker } = currWorker
return new Promise((resolve) => {
// console.log('[ACCUIRE]: #' + index)
worker.postMessage(obj)
worker.once('message', (msg) => {
resolve(msg)