mirror of
https://gitlab.com/MrFry/mrfrys-node-server
synced 2025-04-01 20:24:18 +02:00
Added worker pools, and very very basic workers
This commit is contained in:
parent
771959ec2b
commit
f5f3b51eee
5 changed files with 211 additions and 5 deletions
5
package-lock.json
generated
5
package-lock.json
generated
|
@ -1466,6 +1466,11 @@
|
||||||
"wide-align": "^1.1.0"
|
"wide-align": "^1.1.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"generic-pool": {
|
||||||
|
"version": "3.7.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.7.1.tgz",
|
||||||
|
"integrity": "sha512-ug6DAZoNgWm6q5KhPFA+hzXfBLFQu5sTXxPpv44DmE0A2g+CiHoq9LTVdkXpZMkYVMoGw83F6W+WT0h0MFMK/w=="
|
||||||
|
},
|
||||||
"getpass": {
|
"getpass": {
|
||||||
"version": "0.1.7",
|
"version": "0.1.7",
|
||||||
"resolved": "https://registry.npmjs.org/getpass/-/getpass-0.1.7.tgz",
|
"resolved": "https://registry.npmjs.org/getpass/-/getpass-0.1.7.tgz",
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
"eslint-plugin-typescript": "^0.14.0",
|
"eslint-plugin-typescript": "^0.14.0",
|
||||||
"express": "^4.6.1",
|
"express": "^4.6.1",
|
||||||
"express-ejs-layouts": "^1.1.0",
|
"express-ejs-layouts": "^1.1.0",
|
||||||
|
"generic-pool": "^3.7.1",
|
||||||
"sqlite3": "^4.1.1",
|
"sqlite3": "^4.1.1",
|
||||||
"ts-node": "^9.0.0",
|
"ts-node": "^9.0.0",
|
||||||
"typescript": "^4.1.2",
|
"typescript": "^4.1.2",
|
||||||
|
|
|
@ -290,4 +290,16 @@ if (certsLoaded) {
|
||||||
logger.Log('Https not avaible')
|
logger.Log('Https not avaible')
|
||||||
}
|
}
|
||||||
|
|
||||||
// app.listen(port)
|
console.log('hai')
|
||||||
|
import { init, doALongTask } from './utils/workerPool'
|
||||||
|
init()
|
||||||
|
setTimeout(() => {
|
||||||
|
doALongTask(1, {
|
||||||
|
type: 'work',
|
||||||
|
add: 1,
|
||||||
|
index: 1,
|
||||||
|
}).then((res) => {
|
||||||
|
console.log('woohoo!')
|
||||||
|
console.log(res)
|
||||||
|
})
|
||||||
|
}, 6000)
|
||||||
|
|
|
@ -598,11 +598,58 @@ const workerTs = (file: string, wkOpts: any) => {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isMainThread) {
|
// if (!isMainThread) {
|
||||||
logger.DebugLog(`Starting search worker ...`, 'searchworker', 1)
|
// logger.DebugLog(`Starting search worker ...`, 'searchworker', 1)
|
||||||
const { data, subjName, question, questionData } = workerData
|
// const { data, subjName, question, questionData } = workerData
|
||||||
searchWorker(data, subjName, question, questionData)
|
// searchWorker(data, subjName, question, questionData)
|
||||||
|
// }
|
||||||
|
|
||||||
|
function random(min, max) {
|
||||||
|
return Math.floor(Math.random() * (max - min) + min)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!isMainThread) {
|
||||||
|
const workerIndex = workerData.workerIndex
|
||||||
|
const timeoutMin = workerData.workerTimeoutMin
|
||||||
|
const timeoutMax = workerData.workerTimeoutMax
|
||||||
|
const data = { val: 0 }
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
`[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty`
|
||||||
|
)
|
||||||
|
console.log(`[THREAD #${workerIndex}]: data`, workerData)
|
||||||
|
// parentPort.postMessage('hello parent port')
|
||||||
|
|
||||||
|
parentPort.on('message', (msg) => {
|
||||||
|
// console.log(`[THREAD #${workerIndex}]: onmsg`, msg)
|
||||||
|
|
||||||
|
if (msg.type === 'work') {
|
||||||
|
const index = msg.index
|
||||||
|
console.log(`[THREAD #${workerIndex}]: staring work on ${index}`)
|
||||||
|
setTimeout(() => {
|
||||||
|
data.val = data.val + msg.add
|
||||||
|
parentPort.postMessage({
|
||||||
|
msg: `From thread #${workerIndex}: job ${index} done`,
|
||||||
|
workerIndex: workerIndex,
|
||||||
|
result: data.val,
|
||||||
|
})
|
||||||
|
|
||||||
|
console.log(`[THREAD #${workerIndex}]: Work ${index} done!`)
|
||||||
|
}, random(timeoutMin, timeoutMax))
|
||||||
|
} else if (msg.type === 'update') {
|
||||||
|
if (msg.data.workerIndex !== workerIndex) {
|
||||||
|
console.log(`[THREAD #${workerIndex}]: update`, msg.data)
|
||||||
|
console.log(
|
||||||
|
`[THREAD #${workerIndex}]: From ${data.val} to ${msg.data.result}`
|
||||||
|
)
|
||||||
|
data.val = msg.data.result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
console.log('[THREAD]: Main thread!')
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
export {
|
export {
|
||||||
|
|
141
src/utils/workerPool.ts
Normal file
141
src/utils/workerPool.ts
Normal file
|
@ -0,0 +1,141 @@
|
||||||
|
import { Worker } from 'worker_threads'
|
||||||
|
import genericPool from 'generic-pool'
|
||||||
|
import os from 'os'
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
const workerFile = './src/utils/classes.ts'
|
||||||
|
const workerTimeoutMin = 5 * 1000
|
||||||
|
const workerTimeoutMax = 10 * 1000
|
||||||
|
let pool: any = null
|
||||||
|
let workers: any = null
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// doALongTask(i, taskObj).then((res) => {
|
||||||
|
// msgAll(res)
|
||||||
|
// console.log('[MSGFROMCLIENT]', res)
|
||||||
|
// })
|
||||||
|
|
||||||
|
export function doALongTask(i: Number, obj: any): Promise<any> {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
pool
|
||||||
|
.acquire()
|
||||||
|
.then(function(client) {
|
||||||
|
doSomething(client, obj).then((res) => {
|
||||||
|
resolve(res)
|
||||||
|
// TODO: check if result is really a result, and want to release port
|
||||||
|
pool.release(client)
|
||||||
|
console.log('[RELEASE]: #' + client.index)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.catch(function(err) {
|
||||||
|
console.log('resourcePromise error')
|
||||||
|
console.log(err)
|
||||||
|
// handle error - this is generally a timeout or maxWaitingClients
|
||||||
|
// error
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
export function init(): void {
|
||||||
|
if (workers && pool) {
|
||||||
|
console.log('WORKER AND POOL ALREADY EXISTS')
|
||||||
|
return
|
||||||
|
}
|
||||||
|
workers = []
|
||||||
|
const factory = {
|
||||||
|
create: function() {
|
||||||
|
const worker = getAWorker(workers.length)
|
||||||
|
workers.push(worker)
|
||||||
|
return {
|
||||||
|
worker: worker,
|
||||||
|
index: workers.length,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
destroy: function(client) {
|
||||||
|
console.log('[DESTROY]')
|
||||||
|
client.worker.terminate()
|
||||||
|
console.log('[DESTROYED] #' + client.index)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
const opts = {
|
||||||
|
min: os.cpus().length - 1, // minimum size of the pool
|
||||||
|
max: os.cpus().length - 1, // maximum size of the pool
|
||||||
|
maxWaitingClients: 999,
|
||||||
|
}
|
||||||
|
|
||||||
|
pool = genericPool.createPool(factory, opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
export function msgAll(data: any): void {
|
||||||
|
workers.forEach((worker) => {
|
||||||
|
worker.postMessage({
|
||||||
|
type: 'update',
|
||||||
|
data,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
function getAWorker(i) {
|
||||||
|
const worker = workerTs(workerFile, {
|
||||||
|
workerData: {
|
||||||
|
workerIndex: i,
|
||||||
|
workerTimeoutMin,
|
||||||
|
workerTimeoutMax,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
worker.setMaxListeners(50)
|
||||||
|
|
||||||
|
// worker.on('message', (msg) => {
|
||||||
|
// console.log(`[MAIN]: Msg from worker #${i}`, msg)
|
||||||
|
// })
|
||||||
|
|
||||||
|
worker.on('online', () => {
|
||||||
|
console.log(`[THREAD #${i}]: Worker ${i} online`)
|
||||||
|
})
|
||||||
|
|
||||||
|
worker.on('error', (err) => {
|
||||||
|
console.log(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
worker.on('exit', (code) => {
|
||||||
|
console.log(`[MAIN]: worker #${i} exit code: `, code)
|
||||||
|
})
|
||||||
|
return worker
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
function doSomething(client, obj) {
|
||||||
|
const { index, worker } = client
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
console.log('[ACCUIRE]: #' + index)
|
||||||
|
worker.postMessage(obj)
|
||||||
|
worker.once('message', (msg) => {
|
||||||
|
resolve(msg)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const workerTs = (file: string, wkOpts: any) => {
|
||||||
|
wkOpts.eval = true
|
||||||
|
if (!wkOpts.workerData) {
|
||||||
|
wkOpts.workerData = {}
|
||||||
|
}
|
||||||
|
wkOpts.workerData.__filename = file
|
||||||
|
return new Worker(
|
||||||
|
`
|
||||||
|
const wk = require('worker_threads');
|
||||||
|
require('ts-node').register();
|
||||||
|
let file = wk.workerData.__filename;
|
||||||
|
delete wk.workerData.__filename;
|
||||||
|
require(file);
|
||||||
|
`,
|
||||||
|
wkOpts
|
||||||
|
)
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue