diff --git a/package-lock.json b/package-lock.json index a703640..4809163 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "express": "^4.17.3", "express-fileupload": "^1.3.1", "hybrid-crypto-js": "^0.2.4", + "jsonschema": "^1.4.1", "socket.io": "^4.4.1", "tesseract.js": "^3.0.3", "ts-node": "^10.7.0", @@ -4622,6 +4623,14 @@ "node": ">=6" } }, + "node_modules/jsonschema": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/jsonschema/-/jsonschema-1.4.1.tgz", + "integrity": "sha512-S6cATIPVv1z0IlxdN+zUk5EPjkGCdnhN4wVSBlvoUO1tOLJootbo9CquNJmbIh4yikWHiUedhRYrNPn1arpEmQ==", + "engines": { + "node": "*" + } + }, "node_modules/kleur": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz", @@ -10252,6 +10261,11 @@ "minimist": "^1.2.5" } }, + "jsonschema": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/jsonschema/-/jsonschema-1.4.1.tgz", + "integrity": "sha512-S6cATIPVv1z0IlxdN+zUk5EPjkGCdnhN4wVSBlvoUO1tOLJootbo9CquNJmbIh4yikWHiUedhRYrNPn1arpEmQ==" + }, "kleur": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz", diff --git a/package.json b/package.json index 1a115a0..ef5360b 100755 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "express": "^4.17.3", "express-fileupload": "^1.3.1", "hybrid-crypto-js": "^0.2.4", + "jsonschema": "^1.4.1", "socket.io": "^4.4.1", "tesseract.js": "^3.0.3", "ts-node": "^10.7.0", diff --git a/src/modules/api/submodules/p2p.ts b/src/modules/api/submodules/p2p.ts index dfcfeb2..69b749d 100644 --- a/src/modules/api/submodules/p2p.ts +++ b/src/modules/api/submodules/p2p.ts @@ -42,7 +42,11 @@ import { encrypt, isKeypairValid, } from '../../../utils/encryption' -import { doALongTask, msgAllWorker } from '../../../utils/workerPool' +import { + doALongTask, + msgAllWorker, + setPendingJobsAlertCount, +} from '../../../utils/workerPool' import { countOfQdb, countOfQdbs, @@ -50,9 +54,13 @@ import { getAvailableQdbIndexes, removeCacheFromQuestion, } from '../../../utils/qdbUtils' +import { + PeersInfoSchema, + SelfInfoSchema, + validateJSON, +} from '../../../types/typeSchemas' // TODO: remove FINALIZE-s and TOTEST-s -// TODO: script to remove from date from certain host (questions / users) interface MergeResult { newData: Subject[] @@ -300,11 +308,11 @@ function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) { // files const peersPath = 'data/p2p/' -const peersFile = peersPath + '/peers.json' +const peersFile = peersPath + 'peers.json' // writes it) -const selfInfoFile = peersPath + '/selfInfo.json' -const thirdPartyPeersFile = peersPath + '/thirdPartyPeers.json' -const keyFile = peersPath + '/key' // key.pub key.priv +const selfInfoFile = peersPath + 'selfInfo.json' +const thirdPartyPeersFile = peersPath + 'thirdPartyPeers.json' +const keyFile = peersPath + 'key' // key.pub key.priv function setup(data: SubmoduleData): Submodule { const { @@ -371,9 +379,29 @@ function setup(data: SubmoduleData): Submodule { } } - // TODO: validate peers let peers: PeerInfo[] = utils.ReadJSON(peersFile) let selfInfo: PeerInfo = utils.ReadJSON(selfInfoFile) + const { isValid: isPeersValid, errorMsg: peersErrorMsg } = validateJSON( + peers, + PeersInfoSchema + ) + if (!isPeersValid) { + logger.Log(`Peers file (${peersFile}) has invalid contents!`, 'redbg') + peersErrorMsg.forEach((x) => logger.Log(x, 'red')) + + throw new Error('Invalid peers file') + } + const { isValid: isSelfInfoValid, errorMsg: selfInfoErrorMsg } = + validateJSON(selfInfo, SelfInfoSchema) + if (!isSelfInfoValid) { + logger.Log( + `Self info file (${selfInfoFile}) has invalid contents!`, + 'redbg' + ) + selfInfoErrorMsg.forEach((x) => logger.Log(x, 'red')) + + throw new Error('Invalid peers file') + } // self info file is not required to have the publicKey, as it is always added on init selfInfo.publicKey = publicKey @@ -432,7 +460,10 @@ function setup(data: SubmoduleData): Submodule { try { // FIXME: dont log if fails result.revision = child_process - .execSync('git rev-parse HEAD', { cwd: __dirname }) + .execSync('git rev-parse HEAD', { + cwd: __dirname, + stdio: [0, 'pipe', null], + }) .toString() .trim() } catch (e) { @@ -484,7 +515,6 @@ function setup(data: SubmoduleData): Submodule { const mergeJobs: Promise[] = [] const rawNewQuestionDbs: QuestionDb[] = [] remoteQuestionDbs.forEach((remoteQdb) => { - // TODO: warn on qdb differences like shouldSave const localQdb = getQuestionDbs().find( (lqdb) => lqdb.name === remoteQdb.name ) @@ -530,7 +560,7 @@ function setup(data: SubmoduleData): Submodule { }${logger.C()} peers` ) - const lastSync = new Date('2023-01-01').getTime() // FINALIZE date: this is only for testing // selfInfo.lastSync + const lastSync = new Date('2022-03-12').getTime() // FINALIZE date: this is only for testing // selfInfo.lastSync logger.Log( `\tLast sync date: ${logger.C('blue')}${new Date( lastSync @@ -538,7 +568,7 @@ function setup(data: SubmoduleData): Submodule { ) const syncStart = new Date().getTime() const requests = peers.map((peer) => { - const lastSyncWithPeer = new Date('2023-01-01').getTime() // FINALIZE same as above // peer.lastSync || 0 + const lastSyncWithPeer = new Date('2022-03-12').getTime() // FINALIZE same as above // peer.lastSync || 0 logger.Log( `\tLast sync with ${logger.C('blue')}${peerToString( @@ -614,8 +644,6 @@ function setup(data: SubmoduleData): Submodule { } ) - // TOTEST: even on new subjet and new qdb add! TEST - // add new quesstions to db (QuestionData.source = true) const resultData = resultDataWithoutEmptyDbs.map((res) => { return { ...res, @@ -652,10 +680,19 @@ function setup(data: SubmoduleData): Submodule { )}${thirdPartyPeersFile}${logger.C()} for details` ) } + + // all results statistics + const resultsCount: { + [key: string]: { + newUsers?: number + newQuestionDbs?: number + newSubjects?: number + newQuestions?: number + } + } = {} // ------------------------------------------------------------------------------------------------------- // new users handlin TOTEST: test // ------------------------------------------------------------------------------------------------------- - let newUsers = 0 const oldUserCount = dbtools.SelectAll(userDB, 'users').length try { resultData.forEach((res) => { @@ -680,15 +717,8 @@ function setup(data: SubmoduleData): Submodule { newUserCount += 1 } }) - if (newUserCount > 0) { - newUsers += newUserCount - logger.Log( - `\tAdded ${logger.C( - 'green' - )}${newUserCount}${logger.C()} users from "${logger.C( - 'blue' - )}${peerToString(res.peer)}${logger.C()}"` - ) + resultsCount[peerToString(res.peer)] = { + newUsers: newUserCount, } } }) @@ -721,18 +751,22 @@ function setup(data: SubmoduleData): Submodule { countOfQdbs(getQuestionDbs()) const oldQuestionDbCount = getQuestionDbs().length // TOTEST: test if backup wrks - logger.Log('\tBacking up old data ...') backupData(getQuestionDbs()) + logger.Log('\tOld data backed up!') // ------------------------------------------------------------------------------------------------------- // adding questions to db // ------------------------------------------------------------------------------------------------------- - let totalNewQuestions = 0 - let totalNewSubjects = 0 - let totalNewQdbs = 0 for (let i = 0; i < resultData.length; i++) { const { questionDbs: remoteQuestionDbs, peer } = resultData[i] + logger.Log( + `\tProcessing result from "${logger.C('blue')}${peerToString( + peer + )}${logger.C()}" (${logger.C('green')}${ + resultData.length + }${logger.C()}/${logger.C('green')}${i + 1}${logger.C()})` + ) // FIXME: if remoteQuestionDbs contain multiple dbs with the same name, then the merging // process could get wonky. Ideally it should not contain, but we will see @@ -760,22 +794,14 @@ function setup(data: SubmoduleData): Submodule { const { newQuestionDbCount, newSubjectCount, newQuestionCount } = await sendNewDataToWorkers(mergeResults, newQuestionDbs) - if (newQuestionCount > 0) { - logger.Log( - `\tAdded ${logger.C( - 'green' - )}${newQuestionDbCount.toLocaleString()}${logger.C()} new question DB-s, ${logger.C( - 'green' - )}${newSubjectCount.toLocaleString()}${logger.C()} new subjects and ${logger.C( - 'green' - )}${newQuestionCount.toLocaleString()}${logger.C()} new questions from "${logger.C( - 'blue' - )}${peerToString(peer)}${logger.C()}"` - ) + resultsCount[peerToString(peer)] = { + ...(resultsCount[peerToString(peer)] || { newUsers: 0 }), + newQuestionDbs: newQuestionDbCount, + newSubjects: newSubjectCount, + newQuestions: newQuestionCount, } - // Processing result data is successfull - const newPeers = peers.map((x) => { + const updatedPeersFile = peers.map((x) => { if (isPeerSameAs(peer, x)) { return { ...x, @@ -786,11 +812,10 @@ function setup(data: SubmoduleData): Submodule { } }) - utils.WriteFile(JSON.stringify(newPeers, null, 2), peersFile) - - totalNewQdbs += newQuestionDbCount - totalNewSubjects += newSubjectCount - totalNewQuestions += newQuestionCount + utils.WriteFile( + JSON.stringify(updatedPeersFile, null, 2), + peersFile + ) } // ------------------------------------------------------------------------------------------------------- @@ -801,30 +826,58 @@ function setup(data: SubmoduleData): Submodule { countOfQdbs(newQdb) const newQuestionDbCount = newQdb.length - logger.logTable([ - ['\t', 'Users', 'QDBs', 'Subjs', 'Questions'], + const resultsTable = Object.entries(resultsCount).map( + ([key, value]) => { + return [ + key.length > 14 ? key.substring(0, 14) + '...' : key, + value.newUsers, + value.newQuestionDbs, + value.newSubjects, + value.newQuestions, + ] + } + ) + + const sumNewCount = (key: string) => { + return Object.values(resultsCount).reduce( + (acc, val) => acc + val[key], + 0 + ) + } + + const newUsers = sumNewCount('newUsers') + const totalNewQuestions = sumNewCount('newQuestions') + const totalNewSubjects = sumNewCount('newSubjects') + const totalNewQdbs = sumNewCount('newQuestionDbs') + + logger.logTable( [ - 'Old\t', - oldUserCount, - oldQuestionDbCount, - oldSubjCount, - oldQuestionCount, + ['', 'Users', 'QDBs', 'Subjs', 'Questions'], + [ + 'Old', + oldUserCount, + oldQuestionDbCount, + oldSubjCount, + oldQuestionCount, + ], + ...resultsTable, + [ + 'Added total', + newUsers, + totalNewQdbs, + totalNewSubjects, + totalNewQuestions, + ], + [ + 'Final', + newUserCount, + newQuestionDbCount, + newSubjCount, + newQuestionCount, + ], ], - [ - 'Added', - newUsers, - totalNewQdbs, - totalNewSubjects, - totalNewQuestions, - ], - [ - 'Final', - newUserCount, - newQuestionDbCount, - newSubjCount, - newQuestionCount, - ], - ]) + { colWidth: [15] } + ) logger.Log( `Question DB-s written! Sync successfully finished!`, @@ -861,7 +914,9 @@ function setup(data: SubmoduleData): Submodule { }) app.get('/getnewdatasince', (req: Request, res: Response) => { - // TODO: hash question db to see if different? + // FIXME: hash question db to see if different? + // it could help in determining if it should be checked for new data, but it would only save + // a getNewDataSince() call per question db logger.LogReq(req) const since = +req.query.since const remoteHost = req.query.host @@ -929,12 +984,14 @@ function setup(data: SubmoduleData): Submodule { // return // } + setPendingJobsAlertCount(5000) syncData() .then((syncResult) => { res.json({ msg: 'sync successfull', ...syncResult, }) + setPendingJobsAlertCount() }) .catch((e) => { console.error(e) @@ -942,6 +999,7 @@ function setup(data: SubmoduleData): Submodule { error: e, msg: e.message, }) + setPendingJobsAlertCount() }) }) diff --git a/src/types/basicTypes.ts b/src/types/basicTypes.ts index 6371fa4..7f184da 100644 --- a/src/types/basicTypes.ts +++ b/src/types/basicTypes.ts @@ -174,5 +174,7 @@ export interface PeerInfo { host: string port: number publicKey: string + contact: string lastSync?: Date + note?: string } diff --git a/src/types/typeSchemas.ts b/src/types/typeSchemas.ts index 95a4401..09904b5 100644 --- a/src/types/typeSchemas.ts +++ b/src/types/typeSchemas.ts @@ -1,5 +1,110 @@ -export const PeerInfoSchema = { - name: { type: 'string' }, - host: { type: 'string' }, - port: { type: 'string' }, +import { Schema, Validator } from 'jsonschema' +// https://json-schema.org/learn/getting-started-step-by-step +const validator = new Validator() + +export const validateJSON = ( + val: unknown, + schema: Schema +): { + isValid: boolean + errorMsg: string[] +} => { + const res = validator.validate(val, schema) + const errorMsg = res.errors.map((e) => { + return `${e.stack}` + }) + return { + isValid: res.valid, + errorMsg: errorMsg, + } +} +export class InvalidJSONError extends Error { + constructor(errorDetails: { + msg: string + expected: Schema + actual: unknown + }) { + const { msg, expected, actual } = errorDetails + super( + msg + + '\nExpected:\n' + + JSON.stringify(expected, null, 2) + + '\nActual:\n' + + JSON.stringify(actual, null, 2) + ) + } +} + +const PeerInfoSchemaBase = { + type: 'object', + properties: { + name: { type: 'string' }, + host: { type: 'string' }, + port: { type: 'number' }, + publicKey: { type: 'string' }, + contact: { type: 'string' }, + lastSync: { type: 'number' }, + note: { type: 'string' }, + }, + required: ['name', 'host', 'port'], +} + +export const SelfInfoSchema: Schema = { + ...PeerInfoSchemaBase, + required: ['name', 'host', 'port', 'contact'], +} + +export const PeerInfoSchema: Schema = { + ...PeerInfoSchemaBase, +} + +export const PeersInfoSchema: Schema = { + type: 'array', + items: PeerInfoSchema, +} + +export const QuestionSchema: Schema = { + type: 'object', + properties: { + Q: { type: 'string' }, + A: { type: 'string' }, + data: { + type: 'object', + properties: { + type: { type: 'string' }, + date: { type: 'number' }, + source: { type: 'string' }, + base64: { type: 'array', items: { type: 'string' } }, + images: { type: 'array', items: { type: 'string' } }, + hashedImages: { type: 'array', items: { type: 'string' } }, + possibleAnswers: { + type: 'array', + items: { + type: 'object', + properties: { + type: { type: 'string' }, + val: { type: 'string' }, + selectedByUser: { type: 'boolean' }, + }, + }, + }, + }, + required: ['type'], + }, + }, + required: ['Q', 'A', 'data'], +} + +export const SubjectSchema: Schema = { + type: 'object', + properties: { + Name: { type: 'string' }, + Questions: { type: 'array', items: QuestionSchema }, + }, + required: ['Name', 'Questions'], +} + +export const QuestoinDbFileSchema: Schema = { + type: 'array', + items: SubjectSchema, } diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 83ec4f8..b181bfc 100755 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -419,10 +419,14 @@ function C(color?: string): string { return '\x1b[0m' } -function logTable(table: (string | number)[][]): void { +function logTable( + table: (string | number)[][], + options: { colWidth?: number[] } = {} +): void { table.forEach((row, i) => { const rowString: string[] = [] row.forEach((cell, j) => { + const { colWidth } = options const cellColor = j === 0 || i === 0 ? 'blue' : 'green' let cellVal = '' if (!isNaN(+cell)) { @@ -430,6 +434,17 @@ function logTable(table: (string | number)[][]): void { } else { cellVal = cell.toString() } + + if (colWidth[j]) { + if (cellVal.length < colWidth[j]) { + while (cellVal.length < colWidth[j]) { + cellVal += ' ' + } + } else if (cellVal.length > colWidth[j]) { + cellVal = cellVal.substring(0, colWidth[j] - 3) + '...' + } + } + rowString.push(C(cellColor) + cellVal + C()) }) Log(rowString.join('\t')) diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index 3b15a48..9578486 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -96,7 +96,8 @@ interface DoneEvent extends EventEmitter { emit(event: 'done', res: WorkerResult): boolean } -const alertOnPendingCount = 100 +export const defaultAlertOnPendingCount = 100 +let alertOnPendingCount = defaultAlertOnPendingCount const workerFile = './src/utils/classes.ts' let workers: Array let getInitData: () => Array = null @@ -137,6 +138,12 @@ export function msgAllWorker(data: TaskObject): Promise { }) } +export function setPendingJobsAlertCount(newVal?: number): void { + const count = newVal != null ? newVal : defaultAlertOnPendingCount + logger.DebugLog('setPendingJobsAlertCount: ' + count, 'job', 1) + alertOnPendingCount = count +} + export function doALongTask( obj: TaskObject, targetWorkerIndex?: number