From 68dcbff84668a3b1636236e24c602e6c59689001 Mon Sep 17 00:00:00 2001 From: mrfry Date: Sat, 19 Dec 2020 11:31:12 +0100 Subject: [PATCH] Threads store their own qdb. It gets updated when new questions are added to reduce message sizes between threads --- src/modules/api/api.ts | 32 +++++----- src/types/basicTypes.ts | 1 + src/utils/actions.ts | 17 +++++- src/utils/classes.ts | 128 +++++----------------------------------- src/utils/workerPool.ts | 19 +++--- 5 files changed, 56 insertions(+), 141 deletions(-) diff --git a/src/modules/api/api.ts b/src/modules/api/api.ts index e3aac02..3ba48a5 100644 --- a/src/modules/api/api.ts +++ b/src/modules/api/api.ts @@ -35,11 +35,16 @@ import { backupData, loadJSON, RecievedData, + getQuestionDbsWithoutFunct, } from '../../utils/actions' import dbtools from '../../utils/dbtools' import auth from '../../middlewares/auth.middleware' import { dataToString } from '../../utils/classes' -import { initWorkerPool, doALongTask } from '../../utils/workerPool' +import { + initWorkerPool, + doALongTask, + msgAllWorker, +} from '../../utils/workerPool' import { SetupData } from '../../server' import { ModuleType, User, DataFile, Request } from '../../types/basicTypes' @@ -76,8 +81,6 @@ let publicdirs = [] function GetApp(): ModuleType { const app = express() - initWorkerPool() - const publicDir = publicdirs[0] if (!publicDir) { throw new Error(`No public dir! ( API )`) @@ -166,6 +169,8 @@ function GetApp(): ModuleType { // FIXME: check type from file let testUsers: any = [] + initWorkerPool(getQuestionDbsWithoutFunct(questionDbs)) + function mergeObjSum(a, b) { const res = { ...b } Object.keys(a).forEach((key) => { @@ -1031,6 +1036,10 @@ function GetApp(): ModuleType { success: resultArray.length > 0, newQuestions: resultArray, }) + msgAllWorker({ + qdbs: getQuestionDbsWithoutFunct(questionDbs), + type: 'update', + }) }) .catch((err) => { logger.Log( @@ -1067,18 +1076,7 @@ function GetApp(): ModuleType { if (req.query.q && req.query.data) { const subj: any = req.query.subj || '' const question = req.query.q - let recData: any = req.query.data - if (typeof recData === 'string') { - try { - recData = JSON.parse(recData) - } catch (err) { - logger.Log( - 'Error parsing recData in /ask!', - logger.GetColor('redbg') - ) - console.error(err) - } - } + const recData: any = req.query.data const promises = [] @@ -1087,8 +1085,8 @@ function GetApp(): ModuleType { doALongTask({ type: 'work', data: { - qdb: qdb.data, - question, + searchIn: [qdb.index], // TODO: search in all + question: question, subjName: subj, questionData: recData, }, diff --git a/src/types/basicTypes.ts b/src/types/basicTypes.ts index df45cb7..19f0d9c 100644 --- a/src/types/basicTypes.ts +++ b/src/types/basicTypes.ts @@ -25,6 +25,7 @@ export interface DataFile { export interface QuestionDb extends DataFile { data: Array + index: Number } export interface User { diff --git a/src/utils/actions.ts b/src/utils/actions.ts index 4d118a9..dc01021 100755 --- a/src/utils/actions.ts +++ b/src/utils/actions.ts @@ -22,7 +22,7 @@ const dataLockFile = './data/lockData' import logger from '../utils/logger' import { createQuestion } from '../utils/classes' -import { doALongTask } from './workerPool' +import { doALongTask } from './workerPool' import idStats from '../utils/ids' import utils from '../utils/utils' import { SearchResult, addQuestion, getSubjNameWithoutYear } from './classes' @@ -170,6 +170,7 @@ function processIncomingRequestUsingDb( doALongTask({ type: 'work', data: { + searchIn: [qdb.index], qdb: qdb.data, question: currentQuestion, subjName: recievedData.subj, @@ -201,6 +202,8 @@ function processIncomingRequestUsingDb( ) logger.DebugLog(currentQuestion, 'actions', 3) addQuestion(qdb.data, sName, currentQuestion) + // TODO: check if it really adds it, not only just some clone (questionDbs in api.ts + // modifies too) }) currWrites++ @@ -261,7 +264,7 @@ function processIncomingRequestUsingDb( } export function loadJSON(dataFiles: Array): Array { - return dataFiles.reduce((acc, dataFile) => { + return dataFiles.reduce((acc, dataFile, index) => { if (!utils.FileExists(dataFile.path)) { utils.WriteFile(JSON.stringify([]), dataFile.path) } @@ -269,6 +272,7 @@ export function loadJSON(dataFiles: Array): Array { try { acc.push({ ...dataFile, + index: index, data: JSON.parse(utils.ReadFile(dataFile.path)), }) } catch (err) { @@ -302,3 +306,12 @@ export function backupData(questionDbs: Array): void { } }) } + +export function getQuestionDbsWithoutFunct( + questionDbs: Array // FIXME: type for dis +): Array { + return questionDbs.map((qdb) => { + const { shouldSave, ...res } = qdb // eslint-disable-line + return res + }) +} diff --git a/src/utils/classes.ts b/src/utils/classes.ts index 7f03f17..17e6ca7 100755 --- a/src/utils/classes.ts +++ b/src/utils/classes.ts @@ -1,11 +1,6 @@ -import { Worker, isMainThread, parentPort, workerData } from 'worker_threads' +import { isMainThread, parentPort, workerData } from 'worker_threads' import logger from './logger' -import { - Question, - QuestionDb, - QuestionData, - Subject, -} from '../types/basicTypes' +import { Question, QuestionData, Subject } from '../types/basicTypes' interface SearchResultQuestion extends Question { match: number @@ -16,8 +11,6 @@ export interface SearchResult { dbName: string } -const searchDataWorkerFile = './src/utils/classes.ts' - const assert = (val) => { if (!val) { throw new Error('Assertion failed') @@ -216,7 +209,7 @@ function createQuestion( return { Q: simplifyQuestion(question), A: answer ? simplifyAnswer(answer) : null, - data, + data: data, } } @@ -414,83 +407,13 @@ function prepareQuestion( preparedQuestion = question } else { // FIXME data was checkedif its null, it should be never null. check if its really never null - const parsedData = typeof data === 'object' ? data : JSON.parse(data) + const parsedData = typeof data === 'object' ? data : JSON.parse(data) // TODO: put in try? preparedQuestion = createQuestion(question, null, parsedData) } return simplifyQuestion(preparedQuestion) } -function searchData( - qdb: QuestionDb, - question: Question | string, - subjName: string, - questionData?: QuestionData | string -): Promise { - // FIXME subjName was checkedif its null, it should be never null. check if its really never null - return new Promise((resolve, reject) => { - assert(question) - logger.DebugLog('Searching for question', 'qdb search', 1) - - const preparedQuestion = prepareQuestion(question, questionData) - - logger.DebugLog('Question:', 'qdb search', 2) - logger.DebugLog(preparedQuestion, 'qdb search', 2) - logger.DebugLog(`Subject name: ${subjName}`, 'qdb search', 2) - - const worker = workerTs(searchDataWorkerFile, { - workerData: { - data: qdb.data, - subjName, - question: preparedQuestion, - questionData, - }, - }) - - worker.on('error', (err) => { - logger.Log('Search Data Worker error!', logger.GetColor('redbg')) - console.error(err) - reject(err) - }) - - worker.on('exit', (code) => { - logger.DebugLog('Search Data exit, code: ' + code, 'actions', 1) - if (code !== 0) { - logger.Log( - 'Search Data Worker error! Exit code is not 0', - logger.GetColor('redbg') - ) - reject(new Error('Search Data Worker error! Exit code is not 0')) - } - }) - - worker.on('message', (result) => { - // TODO: remove (?) - if (typeof result === 'string') { - try { - console.log(JSON.parse(result)) - } catch (err) { - console.log(result) - } - } - logger.DebugLog(`Worker message arrived`, 'worker', 2) - logger.DebugLog(result, 'worker', 3) - logger.DebugLog(`Question result length: ${result.length}`, 'ask', 1) - logger.DebugLog(result, 'ask', 2) - - logger.DebugLog( - `QDB search result length: ${result.length}`, - 'qdb search', - 1 - ) - resolve({ - result: result, - dbName: qdb.name, - }) - }) - }) -} - function dataToString(data: Array): string { const result = [] data.forEach((subj) => { @@ -509,10 +432,7 @@ function doSearch( ): any { let result = [] - const questionToSearch = - typeof question === 'string' - ? createQuestion(question, null, questionData || { type: 'simple' }) - : question + const questionToSearch = prepareQuestion(question, questionData) assert(questionToSearch.data) @@ -565,26 +485,9 @@ function doSearch( return result } -const workerTs = (file: string, wkOpts: any) => { - wkOpts.eval = true - if (!wkOpts.workerData) { - wkOpts.workerData = {} - } - wkOpts.workerData.__filename = file - return new Worker( - ` - const wk = require('worker_threads'); - require('ts-node').register(); - let file = wk.workerData.__filename; - delete wk.workerData.__filename; - require(file); - `, - wkOpts - ) -} - if (!isMainThread) { - const workerIndex = workerData.workerIndex + const { workerIndex } = workerData + let qdbs: Array = workerData.initData logger.Log( `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty` @@ -592,7 +495,7 @@ if (!isMainThread) { parentPort.on('message', (msg) => { if (msg.type === 'work') { - const { qdb, subjName, question, questionData } = msg.data + const { searchIn, subjName, question, questionData } = msg.data const index = msg.index console.log( `[THREAD #${workerIndex}]: staring work${ @@ -601,8 +504,12 @@ if (!isMainThread) { ) let searchResult = null + const currQdb = qdbs.find((qdb) => { + return searchIn[0] === qdb.index + }) + try { - searchResult = doSearch(qdb, subjName, question, questionData) + searchResult = doSearch(currQdb.data, subjName, question, questionData) } catch (err) { logger.Log('Error in worker thread!', logger.GetColor('redbg')) console.error(err) @@ -623,12 +530,8 @@ if (!isMainThread) { }done!` ) } else if (msg.type === 'update') { - if (msg.data.workerIndex !== workerIndex) { - // TODO - // qdbs = msg.qdb - console.log(`[THREAD #${workerIndex}]: update`, msg.data) - console.log(`[THREAD #${workerIndex}]: From ... to ${msg.data.result}`) - } + qdbs = msg.qdbs + console.log(`[THREAD #${workerIndex}]: update`) } }) } else { @@ -642,6 +545,5 @@ export { getSubjNameWithoutYear, createQuestion, addQuestion, - searchData, dataToString, } diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index b7098cc..2ea84ac 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -39,7 +39,7 @@ export function doALongTask(obj: any): Promise { }) } -export function initWorkerPool(): void { +export function initWorkerPool(initData: any): void { if (workers && pool) { logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg')) return @@ -48,7 +48,7 @@ export function initWorkerPool(): void { const factory = { create: function() { const currInd = workers.length - const worker = getAWorker(currInd) + const worker = getAWorker(currInd, initData) workers.push(worker) return { worker: worker, @@ -71,21 +71,19 @@ export function initWorkerPool(): void { pool = genericPool.createPool(factory, opts) } -export function msgAll(data: any): void { +export function msgAllWorker(data: any): void { workers.forEach((worker) => { - worker.postMessage({ - type: 'update', - data, - }) + worker.postMessage(data) }) } // --------------------------------------------------------------------------- -function getAWorker(i) { +function getAWorker(i, initData) { const worker = workerTs(workerFile, { workerData: { workerIndex: i, + initData: initData, }, }) @@ -105,7 +103,10 @@ function getAWorker(i) { }) worker.on('exit', (code) => { - logger.Log(`[MAIN]: worker #${i} exit code: ${code}`) + logger.Log( + `[MAIN]: worker #${i} exit code: ${code}`, + code === 0 ? logger.GetColor('redbg') : logger.GetColor('green') + ) }) return worker }