Answering questions using worker poolz

This commit is contained in:
mrfry 2020-12-19 09:44:07 +01:00
parent f5f3b51eee
commit d8695682f7
4 changed files with 67 additions and 91 deletions

View file

@ -38,7 +38,8 @@ import {
} from '../../utils/actions'
import dbtools from '../../utils/dbtools'
import auth from '../../middlewares/auth.middleware'
import { dataToString, searchDatas } from '../../utils/classes'
import { dataToString } from '../../utils/classes'
import { initWorkerPool, doALongTask } from '../../utils/workerPool'
import { SetupData } from '../../server'
import { ModuleType, User, DataFile, Request } from '../../types/basicTypes'
@ -75,6 +76,8 @@ let publicdirs = []
function GetApp(): ModuleType {
const app = express()
initWorkerPool()
const publicDir = publicdirs[0]
if (!publicDir) {
throw new Error(`No public dir! ( API )`)
@ -1066,7 +1069,19 @@ function GetApp(): ModuleType {
const question = req.query.q
const recData: any = req.query.data
searchDatas(questionDbs, question, subj, recData)
const promises = []
questionDbs.map((qdb, i) => {
promises.push(
doALongTask(i, {
type: 'work',
index: i,
data: { qdb: qdb.data, question, subjName: subj, recData },
})
)
})
Promise.all(promises)
.then((result) => {
try {
const mergedResult = result.reduce((acc, dbRes) => {

View file

@ -289,17 +289,3 @@ if (certsLoaded) {
} else {
logger.Log('Https not avaible')
}
console.log('hai')
import { init, doALongTask } from './utils/workerPool'
init()
setTimeout(() => {
doALongTask(1, {
type: 'work',
add: 1,
index: 1,
}).then((res) => {
console.log('woohoo!')
console.log(res)
})
}, 6000)

View file

@ -254,6 +254,7 @@ function compareData(q1: Question, q2: Question) {
logger.DebugLog('Error comparing data', 'Compare question data', 1)
logger.DebugLog(error.message, 'Compare question data', 1)
logger.DebugLog(error, 'Compare question data', 2)
console.error(error)
}
return 0
}
@ -273,7 +274,7 @@ function compareQuestionObj(
q2subjName: string,
data: QuestionData
) {
assert(data)
assert(data !== undefined || data !== null)
assert(q1)
assert(typeof q1 === 'object')
assert(q2)
@ -321,12 +322,7 @@ function questionToString(question: Question) {
// ---------------------------------------------------------------------------------------------------------
// Subject
// ---------------------------------------------------------------------------------------------------------
function searchQuestion(
subj: Subject,
question: Question,
questionData: QuestionData,
subjName: string
) {
function searchQuestion(subj: Subject, question: Question, subjName: string) {
assert(question)
let result = []
@ -336,7 +332,7 @@ function searchQuestion(
subjName,
question,
subj.Name,
questionData
question.data
)
if (percent.avg > minMatchAmmount) {
@ -409,19 +405,6 @@ function addQuestion(
}
}
function searchDatas(
data: Array<QuestionDb>,
question: any,
subjName: string,
questionData?: QuestionData
): Promise<Array<SearchResult>> {
return Promise.all(
data.map((db: QuestionDb) => {
return searchData(db, question, subjName, questionData)
})
)
}
function prepareQuestion(
question: string | Question,
data: string | QuestionData
@ -518,14 +501,21 @@ function dataToString(data: Array<Subject>): string {
// ------------------------------------------------------------------------
function searchWorker(
function doSearch(
data: Array<Subject>,
subjName: string,
question: Question,
question: Question | string,
questionData?: QuestionData
): any {
let result = []
const questionToSearch =
typeof question === 'string'
? createQuestion(question, null, questionData || { type: 'simple' })
: question
assert(questionToSearch.data)
data.forEach((subj) => {
if (
subjName
@ -533,9 +523,7 @@ function searchWorker(
.includes(getSubjNameWithoutYear(subj.Name).toLowerCase())
) {
logger.DebugLog(`Searching in ${subj.Name} `, 'searchworker', 2)
result = result.concat(
searchQuestion(subj, question, questionData, subjName)
)
result = result.concat(searchQuestion(subj, questionToSearch, subjName))
}
})
@ -552,9 +540,7 @@ function searchWorker(
1
)
data.forEach((subj) => {
result = result.concat(
searchQuestion(subj, question, questionData, subjName)
)
result = result.concat(searchQuestion(subj, questionToSearch, subjName))
})
if (result.length > 0) {
logger.DebugLog(
@ -576,8 +562,7 @@ function searchWorker(
}
})
parentPort.postMessage(result)
process.exit(0)
return result
}
const workerTs = (file: string, wkOpts: any) => {
@ -598,51 +583,42 @@ const workerTs = (file: string, wkOpts: any) => {
)
}
// if (!isMainThread) {
// logger.DebugLog(`Starting search worker ...`, 'searchworker', 1)
// const { data, subjName, question, questionData } = workerData
// searchWorker(data, subjName, question, questionData)
// }
function random(min, max) {
return Math.floor(Math.random() * (max - min) + min)
}
if (!isMainThread) {
const workerIndex = workerData.workerIndex
const timeoutMin = workerData.workerTimeoutMin
const timeoutMax = workerData.workerTimeoutMax
const data = { val: 0 }
// TODO: check if thread independent
console.log(
logger.Log(
`[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty`
)
console.log(`[THREAD #${workerIndex}]: data`, workerData)
// parentPort.postMessage('hello parent port')
parentPort.on('message', (msg) => {
// console.log(`[THREAD #${workerIndex}]: onmsg`, msg)
if (msg.type === 'work') {
const { qdb, subjName, question, questionData } = msg.data
const index = msg.index
console.log(`[THREAD #${workerIndex}]: staring work on ${index}`)
setTimeout(() => {
data.val = data.val + msg.add
parentPort.postMessage({
msg: `From thread #${workerIndex}: job ${index} done`,
workerIndex: workerIndex,
result: data.val,
})
console.log(`[THREAD #${workerIndex}]: Work ${index} done!`)
}, random(timeoutMin, timeoutMax))
let searchResult = null
try {
searchResult = doSearch(qdb, subjName, question, questionData)
} catch (err) {
logger.Log('Error in worker thread!', logger.GetColor('redbg'))
console.error(err)
}
// ONDONE:
parentPort.postMessage({
msg: `From thread #${workerIndex}: job ${index} done`,
workerIndex: workerIndex,
result: searchResult,
})
console.log(`[THREAD #${workerIndex}]: Work ${index} done!`)
} else if (msg.type === 'update') {
if (msg.data.workerIndex !== workerIndex) {
// TODO
// qdbs = msg.qdb
console.log(`[THREAD #${workerIndex}]: update`, msg.data)
console.log(
`[THREAD #${workerIndex}]: From ${data.val} to ${msg.data.result}`
)
data.val = msg.data.result
console.log(`[THREAD #${workerIndex}]: From ... to ${msg.data.result}`)
}
}
})
@ -658,6 +634,5 @@ export {
createQuestion,
addQuestion,
searchData,
searchDatas,
dataToString,
}

View file

@ -2,11 +2,12 @@ import { Worker } from 'worker_threads'
import genericPool from 'generic-pool'
import os from 'os'
import logger from './logger'
// import { QuestionDb } from '../types/basicTypes'
// ---------------------------------------------------------------------------
const workerFile = './src/utils/classes.ts'
const workerTimeoutMin = 5 * 1000
const workerTimeoutMax = 10 * 1000
let pool: any = null
let workers: any = null
@ -30,17 +31,17 @@ export function doALongTask(i: Number, obj: any): Promise<any> {
})
})
.catch(function(err) {
console.log('resourcePromise error')
console.log(err)
logger.Log('resourcePromise error', logger.GetColor('redbg'))
console.error(err)
// handle error - this is generally a timeout or maxWaitingClients
// error
})
})
}
export function init(): void {
export function initWorkerPool(): void {
if (workers && pool) {
console.log('WORKER AND POOL ALREADY EXISTS')
logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg'))
return
}
workers = []
@ -84,27 +85,26 @@ function getAWorker(i) {
const worker = workerTs(workerFile, {
workerData: {
workerIndex: i,
workerTimeoutMin,
workerTimeoutMax,
},
})
worker.setMaxListeners(50)
// worker.on('message', (msg) => {
// console.log(`[MAIN]: Msg from worker #${i}`, msg)
// logger.Log(`[MAIN]: Msg from worker #${i}`, msg)
// })
worker.on('online', () => {
console.log(`[THREAD #${i}]: Worker ${i} online`)
logger.Log(`[THREAD #${i}]: Worker ${i} online`)
})
worker.on('error', (err) => {
console.log(err)
logger.Log('Worker error!', logger.GetColor('redbg'))
console.error(err)
})
worker.on('exit', (code) => {
console.log(`[MAIN]: worker #${i} exit code: `, code)
logger.Log(`[MAIN]: worker #${i} exit code: ${code}`)
})
return worker
}