/* ---------------------------------------------------------------------------- Question Server GitLab: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ------------------------------------------------------------------------- */ import { Response } from 'express' import * as child_process from 'child_process' import http from 'http' import logger from '../../../utils/logger' import { Request, SubmoduleData, Submodule, PeerInfo, Subject, QuestionDb, User, } from '../../../types/basicTypes' import utils from '../../../utils/utils' import { backupData /*writeData*/ } from '../../../utils/actions' import { WorkerResult } from '../../../utils/classes' import dbtools from '../../../utils/dbtools' import { createKeyPair, decrypt, encrypt, isKeypairValid, } from '../../../utils/encryption' import { doALongTask, msgAllWorker, setPendingJobsAlertCount, } from '../../../utils/workerPool' import { countOfQdb, countOfQdbs, createQuestion, getAvailableQdbIndexes, removeCacheFromQuestion, } from '../../../utils/qdbUtils' import { PeersInfoSchema, SelfInfoSchema, validateJSON, } from '../../../types/typeSchemas' // TODO: remove FINALIZE-s and TOTEST-s interface MergeResult { newData: Subject[] newSubjects: Subject[] localQdbIndex: number e: Error } interface RemotePeerInfo { selfInfo: PeerInfo myPeers: PeerInfo[] revision?: string qdbInfo?: { dbName: string subjs: { name: string count: number }[] }[] } interface RequestResult { data?: T error?: Error options?: http.RequestOptions } interface SyncDataRes { questionDbs?: QuestionDb[] remoteInfo?: RemotePeerInfo encryptedUsers?: string count: { qdbs: number subjects: number questions: number } } function get(options: http.RequestOptions): Promise> { return new Promise((resolve) => { const req = http.get(options, function (res) { const bodyChunks: Uint8Array[] = [] res.on('data', (chunk) => { bodyChunks.push(chunk) }).on('end', () => { const body = Buffer.concat(bodyChunks).toString() try { resolve({ data: JSON.parse(body) }) } catch (e) { resolve({ error: e, options: options }) } }) }) req.on('error', function (e) { resolve({ error: e, options: options }) // reject(e) }) }) } function peerToString(peer: PeerInfo) { return `${peer.host}:${peer.port}` } function isPeerSameAs(peer1: PeerInfo, peer2: PeerInfo) { return peer1.host === peer2.host && peer1.port === peer2.port } export function getNewDataSince(subjects: Subject[], date: number): Subject[] { return subjects .map((subject) => { return { ...subject, Questions: subject.Questions.filter((question) => { return (question.data.date || 0) > date }).map((question) => removeCacheFromQuestion(question)), } }) .filter((subject) => subject.Questions.length !== 0) } export function mergeSubjects( subjectsToMergeTo: Subject[], subjectsToMerge: Subject[], newSubjects: Subject[] ): Subject[] { return [ ...subjectsToMergeTo.map((subj) => { const newSubjs = subjectsToMerge.filter( (subjRes) => subjRes.Name === subj.Name ) if (newSubjs) { const newQuestions = newSubjs.flatMap((subj) => { return subj.Questions }) return { ...subj, Questions: [...subj.Questions, ...newQuestions], } } else { return subj } }), ...newSubjects, ] } export function mergeQdbs( qdbToMergeTo: QuestionDb[], mergeResults: MergeResult[] ): { mergedQuestionDbs: QuestionDb[]; changedQdbIndexes: number[] } { const changedQdbIndexes: number[] = [] const mergedQuestionDbs = qdbToMergeTo.map((qdb) => { const qdbMergeResult = mergeResults.find( (mergeRes) => mergeRes.localQdbIndex === qdb.index ) if (qdbMergeResult) { const mergedQdb = { ...qdb, data: mergeSubjects( qdb.data, qdbMergeResult.newData, qdbMergeResult.newSubjects ), } changedQdbIndexes.push(qdb.index) return mergedQdb } else { // unchanged return qdb } }) return { mergedQuestionDbs: mergedQuestionDbs, changedQdbIndexes: changedQdbIndexes, } } async function sendNewDataToWorkers( mergeResults: MergeResult[], newQuestionDbs: QuestionDb[] ) { // FIXME: this might be slow, maybe make a new type of message for workers? const updatePromises: Promise[] = [] let newQuestionCount = 0 let newSubjectCount = 0 let newQuestionDbCount = 0 mergeResults.forEach((mergeRes) => { if (mergeRes.e) { logger.Log(`There was an error processing the merge!`, 'redbg') console.error(mergeRes.e) return } mergeRes.newData.forEach((subjectWithNewData) => { newQuestionCount += subjectWithNewData.Questions.length updatePromises.push( msgAllWorker({ type: 'newQuestions', data: { subjName: subjectWithNewData.Name, qdbIndex: mergeRes.localQdbIndex, newQuestions: subjectWithNewData.Questions, }, }) ) }) newSubjectCount += mergeRes.newSubjects.length mergeRes.newSubjects.forEach((newSubject) => { newQuestionCount += newSubject.Questions.length updatePromises.push( msgAllWorker({ type: 'newQuestions', data: { subjName: newSubject.Name, qdbIndex: mergeRes.localQdbIndex, newQuestions: newSubject.Questions, }, }) ) }) }) newQuestionDbCount += newQuestionDbs.length newQuestionDbs.forEach((newQdb) => { const { subjCount: sc, questionCount: qc } = countOfQdb(newQdb) newSubjectCount += sc newQuestionCount += qc msgAllWorker({ data: newQdb, type: 'newdb', }) }) await Promise.all(updatePromises) return { newQuestionDbCount: newQuestionDbCount, newSubjectCount: newSubjectCount, newQuestionCount: newQuestionCount, } } function writeNewData( newQuestionDbs: QuestionDb[], changedQuestionDbs: QuestionDb[] ) { const qdbsToWrite = [...newQuestionDbs, ...changedQuestionDbs] qdbsToWrite.forEach((qdb) => { try { // FINALIZE: write to file // writeData(qdb.data, qdb.path) } catch (e) { logger.Log(`Error writing ${qdb.name} qdb to file!`, 'redbg') console.error(e) } }) } function updateLastSync(selfInfo: PeerInfo, newDate: number) { utils.WriteFile( JSON.stringify({ ...selfInfo, lastSync: newDate }, null, 2), selfInfoFile ) } function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) { return { ...qdb, data: qdb.data.map((subj) => { return { ...subj, Questions: subj.Questions.map((q) => { const initializedQuestion = q.cache ? q : createQuestion(q) initializedQuestion.data.source = peerToString(peer) return initializedQuestion }), } }), } } // files const peersPath = 'data/p2p/' const peersFile = peersPath + 'peers.json' // writes it) const selfInfoFile = peersPath + 'selfInfo.json' const thirdPartyPeersFile = peersPath + 'thirdPartyPeers.json' const keyFile = peersPath + 'key' // key.pub key.priv function setup(data: SubmoduleData): Submodule { const { app, userDB, publicdirs, moduleSpecificData: { questionDbs, setQuestionDbs, getQuestionDbs }, // publicdirs, } = data const publicDir = publicdirs[0] // --------------------------------------------------------------------------------------- // SETUP // --------------------------------------------------------------------------------------- // const publicDir = publicdirs[0] if (!utils.FileExists(peersFile)) { logger.Log( `Warning: peers file was missing, so it was created`, 'yellowbg' ) utils.CreatePath(peersPath) utils.WriteFile('[]', peersFile) } if (!utils.FileExists(selfInfoFile)) { logger.Log( 'Self info file for p2p does not exist! P2P functionality will not be loaded', 'redbg' ) logger.Log( `File should be at: ${selfInfoFile} with the interface 'PeerInfo'` ) throw new Error('p2p error') } let publicKey: string let privateKey: string if ( !utils.FileExists(keyFile + '.priv') || !utils.FileExists(keyFile + '.pub') ) { createKeyPair().then(({ publicKey: pubk, privateKey: privk }) => { // at first start there won't be a keypair available until this finishes utils.WriteFile(pubk, keyFile + '.pub') utils.WriteFile(privk, keyFile + '.priv') publicKey = pubk privateKey = privk }) logger.Log( 'There were no public / private keys for p2p functionality, created new ones', 'yellowbg' ) } else { publicKey = utils.ReadFile(keyFile + '.pub') privateKey = utils.ReadFile(keyFile + '.priv') // checking only here, because if it got generated in the other branch then it must be good if (!isKeypairValid(publicKey, privateKey)) { logger.Log('Loaded keypair is not valid!', 'redbg') } } let peers: PeerInfo[] = utils.ReadJSON(peersFile) let selfInfo: PeerInfo = utils.ReadJSON(selfInfoFile) const { isValid: isPeersValid, errorMsg: peersErrorMsg } = validateJSON( peers, PeersInfoSchema ) if (!isPeersValid) { logger.Log(`Peers file (${peersFile}) has invalid contents!`, 'redbg') peersErrorMsg.forEach((x) => logger.Log(x, 'red')) throw new Error('Invalid peers file') } const { isValid: isSelfInfoValid, errorMsg: selfInfoErrorMsg } = validateJSON(selfInfo, SelfInfoSchema) if (!isSelfInfoValid) { logger.Log( `Self info file (${selfInfoFile}) has invalid contents!`, 'redbg' ) selfInfoErrorMsg.forEach((x) => logger.Log(x, 'red')) throw new Error('Invalid peers file') } // self info file is not required to have the publicKey, as it is always added on init selfInfo.publicKey = publicKey const filesToWatch = [ { fname: peersFile, logMsg: 'Peers file updated', action: () => { peers = utils.ReadJSON(peersFile) }, }, { fname: selfInfoFile, logMsg: 'P2P self info file changed', action: () => { selfInfo = utils.ReadJSON(selfInfoFile) selfInfo.publicKey = publicKey }, }, ] filesToWatch.forEach((ftw) => { if (utils.FileExists(ftw.fname)) { utils.WatchFile(ftw.fname, () => { logger.Log(ftw.logMsg) ftw.action() }) ftw.action() } else { logger.Log(`File ${ftw.fname} does not exists to watch!`, 'redbg') } }) if (peers.length === 0) { logger.Log( `Warning: peers file is empty. You probably want to fill it`, 'yellowbg' ) } else { logger.Log('Loaded peers: ' + peers.length) peers.forEach((peer, i) => { logger.Log(`\t${i}\t"${peer.name}": ${peerToString(peer)}`) }) } // --------------------------------------------------------------------------------------- // FUNCTIONS // --------------------------------------------------------------------------------------- function getSelfInfo(includeQdbInfo?: boolean) { const result: RemotePeerInfo = { selfInfo: selfInfo, myPeers: peers, } try { // FIXME: dont log if fails result.revision = child_process .execSync('git rev-parse HEAD', { cwd: __dirname, stdio: [0, 'pipe', null], }) .toString() .trim() } catch (e) { result.revision = 'Failed to get revision' } if (includeQdbInfo) { result.qdbInfo = getQuestionDbs().map((qdb) => { return { dbName: qdb.name, subjs: qdb.data.map((subj) => { return { name: subj.Name, count: subj.Questions.length, } }), } }) } return result } function getNewUsersSince(since?: number) { const users = dbtools.runStatement( userDB, `SELECT * FROM users WHERE created >= ${since};` ) return users } function updateQdbForLocalUse(qdb: QuestionDb[]) { const availableIndexes = getAvailableQdbIndexes( getQuestionDbs(), qdb.length ) return qdb.map((qdb, i) => { return { ...qdb, index: availableIndexes[i], path: `${publicDir}questionDbs/${qdb.name}.json'`, } }) } async function getMergeResults(remoteQuestionDbs: QuestionDb[]) { const mergeJobs: Promise[] = [] const rawNewQuestionDbs: QuestionDb[] = [] remoteQuestionDbs.forEach((remoteQdb) => { const localQdb = getQuestionDbs().find( (lqdb) => lqdb.name === remoteQdb.name ) if (!localQdb) { rawNewQuestionDbs.push(remoteQdb) } else { mergeJobs.push( doALongTask({ type: 'merge', data: { localQdbIndex: localQdb.index, remoteQdb: remoteQdb, }, }) ) } }) const mergeResults: MergeResult[] = await Promise.all(mergeJobs) return { mergeResults: mergeResults, rawNewQuestionDbs: rawNewQuestionDbs, } } async function syncData() { // TOTEST: try with 0 date to merge full dbs if (peers.length === 0) { logger.Log( `There are no peers specified in ${peersFile}, aborting sync`, 'yellowbg' ) return { msg: 'No peers specified, aborting', } } // FIXME: this might be blocking the main thread, but not sure how much logger.Log( `\tStarting data sync, getting new data from ${logger.C('green')}${ peers.length }${logger.C()} peers` ) const lastSync = new Date('2022-03-12').getTime() // FINALIZE date: this is only for testing // selfInfo.lastSync logger.Log( `\tLast sync date: ${logger.C('blue')}${new Date( lastSync ).toLocaleString()}${logger.C()}` ) const syncStart = new Date().getTime() const requests = peers.map((peer) => { const lastSyncWithPeer = new Date('2022-03-12').getTime() // FINALIZE same as above // peer.lastSync || 0 logger.Log( `\tLast sync with ${logger.C('blue')}${peerToString( peer )}${logger.C()}: ${logger.C('blue')}${new Date( lastSyncWithPeer ).toLocaleString()}${logger.C()}` ) return new Promise>( (resolve) => { get({ host: peer.host, port: peer.port, path: `/getnewdatasince?host=${selfInfo.host}${ lastSync ? `&since=${lastSyncWithPeer}` : '' }`, }).then((res) => { resolve({ ...res, data: { ...res.data, peer: peer } }) }) } ) }) const allResults = await Promise.all(requests) // ------------------------------------------------------------------------------------------------------- // filtering, transforming, and counting data // ------------------------------------------------------------------------------------------------------- allResults.forEach((res) => { if (res.error) { logger.Log( `\tError syncing with ${peerToString(res.data.peer)}: ${ res.error.message }`, 'red' ) } }) const resultDataWithoutErrors = allResults .filter((res) => !res.error) .map((res) => res.data) if (resultDataWithoutErrors.length === 0) { logger.Log( `No peers returned data without error, aborting sync`, 'redbg' ) return { msg: 'No peers returned data without error, aborting sync', } } const resultDataWithoutEmptyDbs = resultDataWithoutErrors.filter( (res) => { const qdbCount = res.questionDbs.length const { subjCount, questionCount } = countOfQdbs( res.questionDbs ) logger.Log( `\t"${logger.C('blue')}${peerToString( res.peer )}${logger.C()}" sent "${logger.C( 'green' )}${qdbCount}${logger.C()}" question DB-s with "${logger.C( 'green' )}${subjCount.toLocaleString()}${logger.C()}" subjects, and "${logger.C( 'green' )}${questionCount.toLocaleString()}${logger.C()}" questions` ) return questionCount > 0 } ) const resultData = resultDataWithoutEmptyDbs.map((res) => { return { ...res, questionDbs: res.questionDbs.map((qdb) => { return setupQuestionsForMerge(qdb, res.peer) }), } }) // ------------------------------------------------------------------------------------------------------- // third party peers handling // ------------------------------------------------------------------------------------------------------- const peersHosts = [...peers.map((peer) => peer.host), selfInfo.host] const thirdPartyPeers = resultData .map((res) => res.remoteInfo) .flatMap((x) => { return x.myPeers.filter( (recievedPeer) => !peersHosts.includes(recievedPeer.host) ) }) if (thirdPartyPeers.length > 0) { logger.Log( `\tPeers reported ${logger.C('green')}${ thirdPartyPeers.length }${logger.C()} third party peer(s) not connected to this server.` ) utils.WriteFile( JSON.stringify(thirdPartyPeers, null, 2), thirdPartyPeersFile ) logger.Log( `\tSee ${logger.C( 'blue' )}${thirdPartyPeersFile}${logger.C()} for details` ) } // all results statistics const resultsCount: { [key: string]: { newUsers?: number newQuestionDbs?: number newSubjects?: number newQuestions?: number } } = {} // ------------------------------------------------------------------------------------------------------- // new users handlin TOTEST: test // ------------------------------------------------------------------------------------------------------- const oldUserCount = dbtools.SelectAll(userDB, 'users').length try { resultData.forEach((res) => { if (res.encryptedUsers) { const decryptedUsers: User[] = JSON.parse( decrypt(privateKey, res.encryptedUsers) ) let newUserCount = 0 decryptedUsers.forEach((remoteUser) => { // eslint-disable-next-line @typescript-eslint/no-unused-vars const { id, ...remoteUserWithoutId } = remoteUser const localUser = dbtools.Select(userDB, 'users', { pw: remoteUser.pw, }) if (!localUser) { // FIXME: users will not have consistend id across servers. This may be // harmless, will see dbtools.Insert(userDB, 'users', { ...(remoteUserWithoutId as Omit), sourceHost: peerToString(res.peer), }) newUserCount += 1 } }) resultsCount[peerToString(res.peer)] = { newUsers: newUserCount, } } }) } catch (e) { logger.Log( '\tError while trying to sync users: ' + e.message, 'redbg' ) console.error(e) } const newUserCount = dbtools.SelectAll(userDB, 'users').length const hasNewData = resultData.length > 0 if (!hasNewData) { logger.Log( `No peers returned any new questions. Sync successfully finished!`, 'green' ) updateLastSync(selfInfo, syncStart) return { msg: 'No peers returned any new questions', } } // ------------------------------------------------------------------------------------------------------- // backup // ------------------------------------------------------------------------------------------------------- const { subjCount: oldSubjCount, questionCount: oldQuestionCount } = countOfQdbs(getQuestionDbs()) const oldQuestionDbCount = getQuestionDbs().length // TOTEST: test if backup wrks backupData(getQuestionDbs()) logger.Log('\tOld data backed up!') // ------------------------------------------------------------------------------------------------------- // adding questions to db // ------------------------------------------------------------------------------------------------------- for (let i = 0; i < resultData.length; i++) { const { questionDbs: remoteQuestionDbs, peer } = resultData[i] logger.Log( `\tProcessing result from "${logger.C('blue')}${peerToString( peer )}${logger.C()}" (${logger.C('green')}${ resultData.length }${logger.C()}/${logger.C('green')}${i + 1}${logger.C()})` ) // FIXME: if remoteQuestionDbs contain multiple dbs with the same name, then the merging // process could get wonky. Ideally it should not contain, but we will see const { rawNewQuestionDbs, mergeResults } = await getMergeResults( remoteQuestionDbs ) const newQuestionDbs = updateQdbForLocalUse(rawNewQuestionDbs) const { mergedQuestionDbs, changedQdbIndexes } = mergeQdbs( getQuestionDbs(), mergeResults ) // TOTEST: test muliple new question dbs from multiple sources // setting new index & path writeNewData( newQuestionDbs, getQuestionDbs().filter((qdb) => { return changedQdbIndexes.includes(qdb.index) }) ) setQuestionDbs([...mergedQuestionDbs, ...newQuestionDbs]) const { newQuestionDbCount, newSubjectCount, newQuestionCount } = await sendNewDataToWorkers(mergeResults, newQuestionDbs) resultsCount[peerToString(peer)] = { ...(resultsCount[peerToString(peer)] || { newUsers: 0 }), newQuestionDbs: newQuestionDbCount, newSubjects: newSubjectCount, newQuestions: newQuestionCount, } // Processing result data is successfull const updatedPeersFile = peers.map((x) => { if (isPeerSameAs(peer, x)) { return { ...x, lastSync: syncStart, } } else { return x } }) utils.WriteFile( JSON.stringify(updatedPeersFile, null, 2), peersFile ) } // ------------------------------------------------------------------------------------------------------- updateLastSync(selfInfo, syncStart) const newQdb = getQuestionDbs() const { subjCount: newSubjCount, questionCount: newQuestionCount } = countOfQdbs(newQdb) const newQuestionDbCount = newQdb.length const resultsTable = Object.entries(resultsCount).map( ([key, value]) => { return [ key.length > 14 ? key.substring(0, 14) + '...' : key, value.newUsers, value.newQuestionDbs, value.newSubjects, value.newQuestions, ] } ) const sumNewCount = (key: string) => { return Object.values(resultsCount).reduce( (acc, val) => acc + val[key], 0 ) } const newUsers = sumNewCount('newUsers') const totalNewQuestions = sumNewCount('newQuestions') const totalNewSubjects = sumNewCount('newSubjects') const totalNewQdbs = sumNewCount('newQuestionDbs') logger.logTable( [ ['', 'Users', 'QDBs', 'Subjs', 'Questions'], [ 'Old', oldUserCount, oldQuestionDbCount, oldSubjCount, oldQuestionCount, ], ...resultsTable, [ 'Added total', newUsers, totalNewQdbs, totalNewSubjects, totalNewQuestions, ], [ 'Final', newUserCount, newQuestionDbCount, newSubjCount, newQuestionCount, ], ], { colWidth: [15] } ) logger.Log( `Question DB-s written! Sync successfully finished!`, 'green' ) return { old: { oldUserCount: oldUserCount, oldQuestionDbCount: oldQuestionDbCount, oldSubjCount: oldSubjCount, oldQuestionCount: oldQuestionCount, }, added: { totalNewQdbs: totalNewQdbs, totalNewSubjects: totalNewSubjects, totalNewQuestions: totalNewQuestions, }, final: { newUserCount: newUserCount, newQuestionDbCount: newQuestionDbCount, newSubjCount: newSubjCount, newQuestionCount: newQuestionCount, }, } } // --------------------------------------------------------------------------------------- // APP SETUP // --------------------------------------------------------------------------------------- app.get('/p2pinfo', (req: Request, res: Response) => { logger.LogReq(req) res.json(getSelfInfo(true)) }) app.get('/getnewdatasince', (req: Request, res: Response) => { // FIXME: hash question db to see if different? // it could help in determining if it should be checked for new data, but it would only save // a getNewDataSince() call per question db logger.LogReq(req) const since = +req.query.since const remoteHost = req.query.host const questionDbsWithNewQuestions = Number.isNaN(since) ? questionDbs : questionDbs .map((qdb) => { return { ...qdb, data: getNewDataSince(qdb.data, since), } }) .filter((qdb) => { const { questionCount: questionCount } = countOfQdb(qdb) return questionCount > 0 }) const { subjCount: subjects, questionCount: questions } = countOfQdbs( questionDbsWithNewQuestions ) const result: SyncDataRes = { questionDbs: questionDbsWithNewQuestions, count: { qdbs: questionDbsWithNewQuestions.length, subjects: subjects, questions: questions, }, remoteInfo: getSelfInfo(), } if (remoteHost) { const remoteHostInfo = peers.find((peer) => { return peer.host === remoteHost }) const remotePublicKey = remoteHostInfo?.publicKey if (remotePublicKey) { // FIXME: sign data? const newUsers = getNewUsersSince(since) result.encryptedUsers = encrypt( remotePublicKey, JSON.stringify(newUsers) ) } else if (remoteHostInfo) { logger.Log( `Warning: ${remoteHostInfo.host}:${remoteHostInfo.port} has no publick key saved!`, 'yellowbg' ) } } res.json(result) }) app.get('/syncp2pdata', (req: Request, res: Response) => { logger.LogReq(req) // FINALIZE: uncomment // const user = req.session.user // if (user.id !== 1) { // res.json({ // status: 'error', // msg: 'only user 1 can call this EP', // }) // return // } setPendingJobsAlertCount(5000) syncData() .then((syncResult) => { res.json({ msg: 'sync successfull', ...syncResult, }) setPendingJobsAlertCount() }) .catch((e) => { console.error(e) res.json({ error: e, msg: e.message, }) setPendingJobsAlertCount() }) }) logger.Log('P2P functionality set up. Peers: ' + peers.length, 'blue') return {} } export default { setup: setup, }