From d8695682f787f5d38241dd9ace525ee9ca36362f Mon Sep 17 00:00:00 2001 From: mrfry Date: Sat, 19 Dec 2020 09:44:07 +0100 Subject: [PATCH] Answering questions using worker poolz --- src/modules/api/api.ts | 19 +++++++- src/server.ts | 14 ------ src/utils/classes.ts | 101 +++++++++++++++------------------------- src/utils/workerPool.ts | 24 +++++----- 4 files changed, 67 insertions(+), 91 deletions(-) diff --git a/src/modules/api/api.ts b/src/modules/api/api.ts index e6f635f..e71e0cf 100644 --- a/src/modules/api/api.ts +++ b/src/modules/api/api.ts @@ -38,7 +38,8 @@ import { } from '../../utils/actions' import dbtools from '../../utils/dbtools' import auth from '../../middlewares/auth.middleware' -import { dataToString, searchDatas } from '../../utils/classes' +import { dataToString } from '../../utils/classes' +import { initWorkerPool, doALongTask } from '../../utils/workerPool' import { SetupData } from '../../server' import { ModuleType, User, DataFile, Request } from '../../types/basicTypes' @@ -75,6 +76,8 @@ let publicdirs = [] function GetApp(): ModuleType { const app = express() + initWorkerPool() + const publicDir = publicdirs[0] if (!publicDir) { throw new Error(`No public dir! ( API )`) @@ -1066,7 +1069,19 @@ function GetApp(): ModuleType { const question = req.query.q const recData: any = req.query.data - searchDatas(questionDbs, question, subj, recData) + const promises = [] + + questionDbs.map((qdb, i) => { + promises.push( + doALongTask(i, { + type: 'work', + index: i, + data: { qdb: qdb.data, question, subjName: subj, recData }, + }) + ) + }) + + Promise.all(promises) .then((result) => { try { const mergedResult = result.reduce((acc, dbRes) => { diff --git a/src/server.ts b/src/server.ts index 4af8f2a..132d536 100755 --- a/src/server.ts +++ b/src/server.ts @@ -289,17 +289,3 @@ if (certsLoaded) { } else { logger.Log('Https not avaible') } - -console.log('hai') -import { init, doALongTask } from './utils/workerPool' -init() -setTimeout(() => { - doALongTask(1, { - type: 'work', - add: 1, - index: 1, - }).then((res) => { - console.log('woohoo!') - console.log(res) - }) -}, 6000) diff --git a/src/utils/classes.ts b/src/utils/classes.ts index c2a51f3..8626a8d 100755 --- a/src/utils/classes.ts +++ b/src/utils/classes.ts @@ -254,6 +254,7 @@ function compareData(q1: Question, q2: Question) { logger.DebugLog('Error comparing data', 'Compare question data', 1) logger.DebugLog(error.message, 'Compare question data', 1) logger.DebugLog(error, 'Compare question data', 2) + console.error(error) } return 0 } @@ -273,7 +274,7 @@ function compareQuestionObj( q2subjName: string, data: QuestionData ) { - assert(data) + assert(data !== undefined || data !== null) assert(q1) assert(typeof q1 === 'object') assert(q2) @@ -321,12 +322,7 @@ function questionToString(question: Question) { // --------------------------------------------------------------------------------------------------------- // Subject // --------------------------------------------------------------------------------------------------------- -function searchQuestion( - subj: Subject, - question: Question, - questionData: QuestionData, - subjName: string -) { +function searchQuestion(subj: Subject, question: Question, subjName: string) { assert(question) let result = [] @@ -336,7 +332,7 @@ function searchQuestion( subjName, question, subj.Name, - questionData + question.data ) if (percent.avg > minMatchAmmount) { @@ -409,19 +405,6 @@ function addQuestion( } } -function searchDatas( - data: Array, - question: any, - subjName: string, - questionData?: QuestionData -): Promise> { - return Promise.all( - data.map((db: QuestionDb) => { - return searchData(db, question, subjName, questionData) - }) - ) -} - function prepareQuestion( question: string | Question, data: string | QuestionData @@ -518,14 +501,21 @@ function dataToString(data: Array): string { // ------------------------------------------------------------------------ -function searchWorker( +function doSearch( data: Array, subjName: string, - question: Question, + question: Question | string, questionData?: QuestionData ): any { let result = [] + const questionToSearch = + typeof question === 'string' + ? createQuestion(question, null, questionData || { type: 'simple' }) + : question + + assert(questionToSearch.data) + data.forEach((subj) => { if ( subjName @@ -533,9 +523,7 @@ function searchWorker( .includes(getSubjNameWithoutYear(subj.Name).toLowerCase()) ) { logger.DebugLog(`Searching in ${subj.Name} `, 'searchworker', 2) - result = result.concat( - searchQuestion(subj, question, questionData, subjName) - ) + result = result.concat(searchQuestion(subj, questionToSearch, subjName)) } }) @@ -552,9 +540,7 @@ function searchWorker( 1 ) data.forEach((subj) => { - result = result.concat( - searchQuestion(subj, question, questionData, subjName) - ) + result = result.concat(searchQuestion(subj, questionToSearch, subjName)) }) if (result.length > 0) { logger.DebugLog( @@ -576,8 +562,7 @@ function searchWorker( } }) - parentPort.postMessage(result) - process.exit(0) + return result } const workerTs = (file: string, wkOpts: any) => { @@ -598,51 +583,42 @@ const workerTs = (file: string, wkOpts: any) => { ) } -// if (!isMainThread) { -// logger.DebugLog(`Starting search worker ...`, 'searchworker', 1) -// const { data, subjName, question, questionData } = workerData -// searchWorker(data, subjName, question, questionData) -// } - -function random(min, max) { - return Math.floor(Math.random() * (max - min) + min) -} - if (!isMainThread) { const workerIndex = workerData.workerIndex - const timeoutMin = workerData.workerTimeoutMin - const timeoutMax = workerData.workerTimeoutMax - const data = { val: 0 } + // TODO: check if thread independent - console.log( + logger.Log( `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty` ) - console.log(`[THREAD #${workerIndex}]: data`, workerData) - // parentPort.postMessage('hello parent port') parentPort.on('message', (msg) => { - // console.log(`[THREAD #${workerIndex}]: onmsg`, msg) - if (msg.type === 'work') { + const { qdb, subjName, question, questionData } = msg.data const index = msg.index console.log(`[THREAD #${workerIndex}]: staring work on ${index}`) - setTimeout(() => { - data.val = data.val + msg.add - parentPort.postMessage({ - msg: `From thread #${workerIndex}: job ${index} done`, - workerIndex: workerIndex, - result: data.val, - }) - console.log(`[THREAD #${workerIndex}]: Work ${index} done!`) - }, random(timeoutMin, timeoutMax)) + let searchResult = null + try { + searchResult = doSearch(qdb, subjName, question, questionData) + } catch (err) { + logger.Log('Error in worker thread!', logger.GetColor('redbg')) + console.error(err) + } + + // ONDONE: + parentPort.postMessage({ + msg: `From thread #${workerIndex}: job ${index} done`, + workerIndex: workerIndex, + result: searchResult, + }) + + console.log(`[THREAD #${workerIndex}]: Work ${index} done!`) } else if (msg.type === 'update') { if (msg.data.workerIndex !== workerIndex) { + // TODO + // qdbs = msg.qdb console.log(`[THREAD #${workerIndex}]: update`, msg.data) - console.log( - `[THREAD #${workerIndex}]: From ${data.val} to ${msg.data.result}` - ) - data.val = msg.data.result + console.log(`[THREAD #${workerIndex}]: From ... to ${msg.data.result}`) } } }) @@ -658,6 +634,5 @@ export { createQuestion, addQuestion, searchData, - searchDatas, dataToString, } diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index 095868e..e68888a 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -2,11 +2,12 @@ import { Worker } from 'worker_threads' import genericPool from 'generic-pool' import os from 'os' +import logger from './logger' +// import { QuestionDb } from '../types/basicTypes' + // --------------------------------------------------------------------------- const workerFile = './src/utils/classes.ts' -const workerTimeoutMin = 5 * 1000 -const workerTimeoutMax = 10 * 1000 let pool: any = null let workers: any = null @@ -30,17 +31,17 @@ export function doALongTask(i: Number, obj: any): Promise { }) }) .catch(function(err) { - console.log('resourcePromise error') - console.log(err) + logger.Log('resourcePromise error', logger.GetColor('redbg')) + console.error(err) // handle error - this is generally a timeout or maxWaitingClients // error }) }) } -export function init(): void { +export function initWorkerPool(): void { if (workers && pool) { - console.log('WORKER AND POOL ALREADY EXISTS') + logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg')) return } workers = [] @@ -84,27 +85,26 @@ function getAWorker(i) { const worker = workerTs(workerFile, { workerData: { workerIndex: i, - workerTimeoutMin, - workerTimeoutMax, }, }) worker.setMaxListeners(50) // worker.on('message', (msg) => { - // console.log(`[MAIN]: Msg from worker #${i}`, msg) + // logger.Log(`[MAIN]: Msg from worker #${i}`, msg) // }) worker.on('online', () => { - console.log(`[THREAD #${i}]: Worker ${i} online`) + logger.Log(`[THREAD #${i}]: Worker ${i} online`) }) worker.on('error', (err) => { - console.log(err) + logger.Log('Worker error!', logger.GetColor('redbg')) + console.error(err) }) worker.on('exit', (code) => { - console.log(`[MAIN]: worker #${i} exit code: `, code) + logger.Log(`[MAIN]: worker #${i} exit code: ${code}`) }) return worker }