Threads store their own qdb. It gets updated when new questions are added to reduce message sizes between threads

This commit is contained in:
mrfry 2020-12-19 11:31:12 +01:00
parent 906ab8ee62
commit 68dcbff846
5 changed files with 56 additions and 141 deletions

View file

@ -35,11 +35,16 @@ import {
backupData, backupData,
loadJSON, loadJSON,
RecievedData, RecievedData,
getQuestionDbsWithoutFunct,
} from '../../utils/actions' } from '../../utils/actions'
import dbtools from '../../utils/dbtools' import dbtools from '../../utils/dbtools'
import auth from '../../middlewares/auth.middleware' import auth from '../../middlewares/auth.middleware'
import { dataToString } from '../../utils/classes' import { dataToString } from '../../utils/classes'
import { initWorkerPool, doALongTask } from '../../utils/workerPool' import {
initWorkerPool,
doALongTask,
msgAllWorker,
} from '../../utils/workerPool'
import { SetupData } from '../../server' import { SetupData } from '../../server'
import { ModuleType, User, DataFile, Request } from '../../types/basicTypes' import { ModuleType, User, DataFile, Request } from '../../types/basicTypes'
@ -76,8 +81,6 @@ let publicdirs = []
function GetApp(): ModuleType { function GetApp(): ModuleType {
const app = express() const app = express()
initWorkerPool()
const publicDir = publicdirs[0] const publicDir = publicdirs[0]
if (!publicDir) { if (!publicDir) {
throw new Error(`No public dir! ( API )`) throw new Error(`No public dir! ( API )`)
@ -166,6 +169,8 @@ function GetApp(): ModuleType {
// FIXME: check type from file // FIXME: check type from file
let testUsers: any = [] let testUsers: any = []
initWorkerPool(getQuestionDbsWithoutFunct(questionDbs))
function mergeObjSum(a, b) { function mergeObjSum(a, b) {
const res = { ...b } const res = { ...b }
Object.keys(a).forEach((key) => { Object.keys(a).forEach((key) => {
@ -1031,6 +1036,10 @@ function GetApp(): ModuleType {
success: resultArray.length > 0, success: resultArray.length > 0,
newQuestions: resultArray, newQuestions: resultArray,
}) })
msgAllWorker({
qdbs: getQuestionDbsWithoutFunct(questionDbs),
type: 'update',
})
}) })
.catch((err) => { .catch((err) => {
logger.Log( logger.Log(
@ -1067,18 +1076,7 @@ function GetApp(): ModuleType {
if (req.query.q && req.query.data) { if (req.query.q && req.query.data) {
const subj: any = req.query.subj || '' const subj: any = req.query.subj || ''
const question = req.query.q const question = req.query.q
let recData: any = req.query.data const recData: any = req.query.data
if (typeof recData === 'string') {
try {
recData = JSON.parse(recData)
} catch (err) {
logger.Log(
'Error parsing recData in /ask!',
logger.GetColor('redbg')
)
console.error(err)
}
}
const promises = [] const promises = []
@ -1087,8 +1085,8 @@ function GetApp(): ModuleType {
doALongTask({ doALongTask({
type: 'work', type: 'work',
data: { data: {
qdb: qdb.data, searchIn: [qdb.index], // TODO: search in all
question, question: question,
subjName: subj, subjName: subj,
questionData: recData, questionData: recData,
}, },

View file

@ -25,6 +25,7 @@ export interface DataFile {
export interface QuestionDb extends DataFile { export interface QuestionDb extends DataFile {
data: Array<Subject> data: Array<Subject>
index: Number
} }
export interface User { export interface User {

View file

@ -22,7 +22,7 @@ const dataLockFile = './data/lockData'
import logger from '../utils/logger' import logger from '../utils/logger'
import { createQuestion } from '../utils/classes' import { createQuestion } from '../utils/classes'
import { doALongTask } from './workerPool' import { doALongTask } from './workerPool'
import idStats from '../utils/ids' import idStats from '../utils/ids'
import utils from '../utils/utils' import utils from '../utils/utils'
import { SearchResult, addQuestion, getSubjNameWithoutYear } from './classes' import { SearchResult, addQuestion, getSubjNameWithoutYear } from './classes'
@ -170,6 +170,7 @@ function processIncomingRequestUsingDb(
doALongTask({ doALongTask({
type: 'work', type: 'work',
data: { data: {
searchIn: [qdb.index],
qdb: qdb.data, qdb: qdb.data,
question: currentQuestion, question: currentQuestion,
subjName: recievedData.subj, subjName: recievedData.subj,
@ -201,6 +202,8 @@ function processIncomingRequestUsingDb(
) )
logger.DebugLog(currentQuestion, 'actions', 3) logger.DebugLog(currentQuestion, 'actions', 3)
addQuestion(qdb.data, sName, currentQuestion) addQuestion(qdb.data, sName, currentQuestion)
// TODO: check if it really adds it, not only just some clone (questionDbs in api.ts
// modifies too)
}) })
currWrites++ currWrites++
@ -261,7 +264,7 @@ function processIncomingRequestUsingDb(
} }
export function loadJSON(dataFiles: Array<DataFile>): Array<QuestionDb> { export function loadJSON(dataFiles: Array<DataFile>): Array<QuestionDb> {
return dataFiles.reduce((acc, dataFile) => { return dataFiles.reduce((acc, dataFile, index) => {
if (!utils.FileExists(dataFile.path)) { if (!utils.FileExists(dataFile.path)) {
utils.WriteFile(JSON.stringify([]), dataFile.path) utils.WriteFile(JSON.stringify([]), dataFile.path)
} }
@ -269,6 +272,7 @@ export function loadJSON(dataFiles: Array<DataFile>): Array<QuestionDb> {
try { try {
acc.push({ acc.push({
...dataFile, ...dataFile,
index: index,
data: JSON.parse(utils.ReadFile(dataFile.path)), data: JSON.parse(utils.ReadFile(dataFile.path)),
}) })
} catch (err) { } catch (err) {
@ -302,3 +306,12 @@ export function backupData(questionDbs: Array<QuestionDb>): void {
} }
}) })
} }
export function getQuestionDbsWithoutFunct(
questionDbs: Array<QuestionDb> // FIXME: type for dis
): Array<any> {
return questionDbs.map((qdb) => {
const { shouldSave, ...res } = qdb // eslint-disable-line
return res
})
}

View file

@ -1,11 +1,6 @@
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads' import { isMainThread, parentPort, workerData } from 'worker_threads'
import logger from './logger' import logger from './logger'
import { import { Question, QuestionData, Subject } from '../types/basicTypes'
Question,
QuestionDb,
QuestionData,
Subject,
} from '../types/basicTypes'
interface SearchResultQuestion extends Question { interface SearchResultQuestion extends Question {
match: number match: number
@ -16,8 +11,6 @@ export interface SearchResult {
dbName: string dbName: string
} }
const searchDataWorkerFile = './src/utils/classes.ts'
const assert = (val) => { const assert = (val) => {
if (!val) { if (!val) {
throw new Error('Assertion failed') throw new Error('Assertion failed')
@ -216,7 +209,7 @@ function createQuestion(
return { return {
Q: simplifyQuestion(question), Q: simplifyQuestion(question),
A: answer ? simplifyAnswer(answer) : null, A: answer ? simplifyAnswer(answer) : null,
data, data: data,
} }
} }
@ -414,83 +407,13 @@ function prepareQuestion(
preparedQuestion = question preparedQuestion = question
} else { } else {
// FIXME data was checkedif its null, it should be never null. check if its really never null // FIXME data was checkedif its null, it should be never null. check if its really never null
const parsedData = typeof data === 'object' ? data : JSON.parse(data) const parsedData = typeof data === 'object' ? data : JSON.parse(data) // TODO: put in try?
preparedQuestion = createQuestion(question, null, parsedData) preparedQuestion = createQuestion(question, null, parsedData)
} }
return simplifyQuestion(preparedQuestion) return simplifyQuestion(preparedQuestion)
} }
function searchData(
qdb: QuestionDb,
question: Question | string,
subjName: string,
questionData?: QuestionData | string
): Promise<SearchResult> {
// FIXME subjName was checkedif its null, it should be never null. check if its really never null
return new Promise((resolve, reject) => {
assert(question)
logger.DebugLog('Searching for question', 'qdb search', 1)
const preparedQuestion = prepareQuestion(question, questionData)
logger.DebugLog('Question:', 'qdb search', 2)
logger.DebugLog(preparedQuestion, 'qdb search', 2)
logger.DebugLog(`Subject name: ${subjName}`, 'qdb search', 2)
const worker = workerTs(searchDataWorkerFile, {
workerData: {
data: qdb.data,
subjName,
question: preparedQuestion,
questionData,
},
})
worker.on('error', (err) => {
logger.Log('Search Data Worker error!', logger.GetColor('redbg'))
console.error(err)
reject(err)
})
worker.on('exit', (code) => {
logger.DebugLog('Search Data exit, code: ' + code, 'actions', 1)
if (code !== 0) {
logger.Log(
'Search Data Worker error! Exit code is not 0',
logger.GetColor('redbg')
)
reject(new Error('Search Data Worker error! Exit code is not 0'))
}
})
worker.on('message', (result) => {
// TODO: remove (?)
if (typeof result === 'string') {
try {
console.log(JSON.parse(result))
} catch (err) {
console.log(result)
}
}
logger.DebugLog(`Worker message arrived`, 'worker', 2)
logger.DebugLog(result, 'worker', 3)
logger.DebugLog(`Question result length: ${result.length}`, 'ask', 1)
logger.DebugLog(result, 'ask', 2)
logger.DebugLog(
`QDB search result length: ${result.length}`,
'qdb search',
1
)
resolve({
result: result,
dbName: qdb.name,
})
})
})
}
function dataToString(data: Array<Subject>): string { function dataToString(data: Array<Subject>): string {
const result = [] const result = []
data.forEach((subj) => { data.forEach((subj) => {
@ -509,10 +432,7 @@ function doSearch(
): any { ): any {
let result = [] let result = []
const questionToSearch = const questionToSearch = prepareQuestion(question, questionData)
typeof question === 'string'
? createQuestion(question, null, questionData || { type: 'simple' })
: question
assert(questionToSearch.data) assert(questionToSearch.data)
@ -565,26 +485,9 @@ function doSearch(
return result return result
} }
const workerTs = (file: string, wkOpts: any) => {
wkOpts.eval = true
if (!wkOpts.workerData) {
wkOpts.workerData = {}
}
wkOpts.workerData.__filename = file
return new Worker(
`
const wk = require('worker_threads');
require('ts-node').register();
let file = wk.workerData.__filename;
delete wk.workerData.__filename;
require(file);
`,
wkOpts
)
}
if (!isMainThread) { if (!isMainThread) {
const workerIndex = workerData.workerIndex const { workerIndex } = workerData
let qdbs: Array<any> = workerData.initData
logger.Log( logger.Log(
`[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty` `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty`
@ -592,7 +495,7 @@ if (!isMainThread) {
parentPort.on('message', (msg) => { parentPort.on('message', (msg) => {
if (msg.type === 'work') { if (msg.type === 'work') {
const { qdb, subjName, question, questionData } = msg.data const { searchIn, subjName, question, questionData } = msg.data
const index = msg.index const index = msg.index
console.log( console.log(
`[THREAD #${workerIndex}]: staring work${ `[THREAD #${workerIndex}]: staring work${
@ -601,8 +504,12 @@ if (!isMainThread) {
) )
let searchResult = null let searchResult = null
const currQdb = qdbs.find((qdb) => {
return searchIn[0] === qdb.index
})
try { try {
searchResult = doSearch(qdb, subjName, question, questionData) searchResult = doSearch(currQdb.data, subjName, question, questionData)
} catch (err) { } catch (err) {
logger.Log('Error in worker thread!', logger.GetColor('redbg')) logger.Log('Error in worker thread!', logger.GetColor('redbg'))
console.error(err) console.error(err)
@ -623,12 +530,8 @@ if (!isMainThread) {
}done!` }done!`
) )
} else if (msg.type === 'update') { } else if (msg.type === 'update') {
if (msg.data.workerIndex !== workerIndex) { qdbs = msg.qdbs
// TODO console.log(`[THREAD #${workerIndex}]: update`)
// qdbs = msg.qdb
console.log(`[THREAD #${workerIndex}]: update`, msg.data)
console.log(`[THREAD #${workerIndex}]: From ... to ${msg.data.result}`)
}
} }
}) })
} else { } else {
@ -642,6 +545,5 @@ export {
getSubjNameWithoutYear, getSubjNameWithoutYear,
createQuestion, createQuestion,
addQuestion, addQuestion,
searchData,
dataToString, dataToString,
} }

View file

@ -39,7 +39,7 @@ export function doALongTask(obj: any): Promise<any> {
}) })
} }
export function initWorkerPool(): void { export function initWorkerPool(initData: any): void {
if (workers && pool) { if (workers && pool) {
logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg')) logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg'))
return return
@ -48,7 +48,7 @@ export function initWorkerPool(): void {
const factory = { const factory = {
create: function() { create: function() {
const currInd = workers.length const currInd = workers.length
const worker = getAWorker(currInd) const worker = getAWorker(currInd, initData)
workers.push(worker) workers.push(worker)
return { return {
worker: worker, worker: worker,
@ -71,21 +71,19 @@ export function initWorkerPool(): void {
pool = genericPool.createPool(factory, opts) pool = genericPool.createPool(factory, opts)
} }
export function msgAll(data: any): void { export function msgAllWorker(data: any): void {
workers.forEach((worker) => { workers.forEach((worker) => {
worker.postMessage({ worker.postMessage(data)
type: 'update',
data,
})
}) })
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function getAWorker(i) { function getAWorker(i, initData) {
const worker = workerTs(workerFile, { const worker = workerTs(workerFile, {
workerData: { workerData: {
workerIndex: i, workerIndex: i,
initData: initData,
}, },
}) })
@ -105,7 +103,10 @@ function getAWorker(i) {
}) })
worker.on('exit', (code) => { worker.on('exit', (code) => {
logger.Log(`[MAIN]: worker #${i} exit code: ${code}`) logger.Log(
`[MAIN]: worker #${i} exit code: ${code}`,
code === 0 ? logger.GetColor('redbg') : logger.GetColor('green')
)
}) })
return worker return worker
} }