mirror of
https://gitlab.com/MrFry/mrfrys-node-server
synced 2025-04-01 20:24:18 +02:00
added nearly complete p2p implementation
This commit is contained in:
parent
11dacdae64
commit
5c22f575dd
25 changed files with 14320 additions and 12563 deletions
|
@ -30,7 +30,16 @@ import logger from '../../utils/logger'
|
|||
import utils from '../../utils/utils'
|
||||
import auth from '../../middlewares/auth.middleware'
|
||||
import { SetupData } from '../../server'
|
||||
import { ModuleType, Request, Submodule } from '../../types/basicTypes'
|
||||
import {
|
||||
DataFile,
|
||||
ModuleSpecificData,
|
||||
ModuleType,
|
||||
QuestionDb,
|
||||
Request,
|
||||
Submodule,
|
||||
} from '../../types/basicTypes'
|
||||
import { loadJSON } from '../../utils/actions'
|
||||
import { initWorkerPool } from '../../utils/workerPool'
|
||||
|
||||
// files
|
||||
const rootRedirectToFile = 'data/apiRootRedirectTo'
|
||||
|
@ -142,7 +151,23 @@ function GetApp(): ModuleType {
|
|||
|
||||
// -------------------------------------------------------------------------------------------
|
||||
|
||||
const submoduleDatas = setupSubModules(app)
|
||||
const dbsFile = publicDir + 'questionDbs.json'
|
||||
|
||||
// FIXME: is dataFiles only a temp variable? does this cause any problems?
|
||||
const dataFiles: Array<DataFile> = utils.ReadJSON(dbsFile)
|
||||
let questionDbs: Array<QuestionDb> = loadJSON(dataFiles, publicDir)
|
||||
initWorkerPool(() => questionDbs)
|
||||
|
||||
const submoduleDatas = setupSubModules(app, {
|
||||
questionDbs: questionDbs,
|
||||
getQuestionDbs: () => {
|
||||
return questionDbs
|
||||
},
|
||||
setQuestionDbs: (newQdbs: QuestionDb[]) => {
|
||||
questionDbs = newQdbs
|
||||
},
|
||||
dbsFile: dbsFile,
|
||||
})
|
||||
|
||||
// -------------------------------------------------------------------------------------------
|
||||
|
||||
|
@ -183,7 +208,7 @@ function GetApp(): ModuleType {
|
|||
|
||||
function setupSubModules(
|
||||
parentApp: express.Application,
|
||||
moduleSpecificData?: any
|
||||
moduleSpecificData: ModuleSpecificData
|
||||
): Submodule[] {
|
||||
const submoduleDir = './submodules/'
|
||||
const absolutePath = __dirname + '/' + submoduleDir
|
||||
|
|
955
src/modules/api/submodules/p2p.ts
Normal file
955
src/modules/api/submodules/p2p.ts
Normal file
|
@ -0,0 +1,955 @@
|
|||
/* ----------------------------------------------------------------------------
|
||||
|
||||
Question Server
|
||||
GitLab: <https://gitlab.com/MrFry/mrfrys-node-server>
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
------------------------------------------------------------------------- */
|
||||
|
||||
import { Response } from 'express'
|
||||
import * as child_process from 'child_process'
|
||||
import http from 'http'
|
||||
|
||||
import logger from '../../../utils/logger'
|
||||
import {
|
||||
Request,
|
||||
SubmoduleData,
|
||||
Submodule,
|
||||
PeerInfo,
|
||||
Subject,
|
||||
QuestionDb,
|
||||
User,
|
||||
} from '../../../types/basicTypes'
|
||||
import utils from '../../../utils/utils'
|
||||
import { backupData /*writeData*/ } from '../../../utils/actions'
|
||||
import { WorkerResult } from '../../../utils/classes'
|
||||
import dbtools from '../../../utils/dbtools'
|
||||
import {
|
||||
createKeyPair,
|
||||
decrypt,
|
||||
encrypt,
|
||||
isKeypairValid,
|
||||
} from '../../../utils/encryption'
|
||||
import { doALongTask, msgAllWorker } from '../../../utils/workerPool'
|
||||
import {
|
||||
countOfQdb,
|
||||
countOfQdbs,
|
||||
createQuestion,
|
||||
getAvailableQdbIndexes,
|
||||
removeCacheFromQuestion,
|
||||
} from '../../../utils/qdbUtils'
|
||||
|
||||
// TODO: remove FINALIZE-s and TOTEST-s
|
||||
// TODO: script to remove from date from certain host (questions / users)
|
||||
|
||||
interface MergeResult {
|
||||
newData: Subject[]
|
||||
newSubjects: Subject[]
|
||||
localQdbIndex: number
|
||||
e: Error
|
||||
}
|
||||
|
||||
interface RemotePeerInfo {
|
||||
selfInfo: PeerInfo
|
||||
myPeers: PeerInfo[]
|
||||
revision?: string
|
||||
qdbInfo?: {
|
||||
dbName: string
|
||||
subjs: {
|
||||
name: string
|
||||
count: number
|
||||
}[]
|
||||
}[]
|
||||
}
|
||||
|
||||
interface RequestResult<T> {
|
||||
data?: T
|
||||
error?: Error
|
||||
options?: http.RequestOptions
|
||||
}
|
||||
|
||||
interface SyncDataRes {
|
||||
questionDbs?: QuestionDb[]
|
||||
remoteInfo?: RemotePeerInfo
|
||||
encryptedUsers?: string
|
||||
count: {
|
||||
qdbs: number
|
||||
subjects: number
|
||||
questions: number
|
||||
}
|
||||
}
|
||||
|
||||
function get<T>(options: http.RequestOptions): Promise<RequestResult<T>> {
|
||||
return new Promise((resolve) => {
|
||||
const req = http.get(options, function (res) {
|
||||
const bodyChunks: Uint8Array[] = []
|
||||
res.on('data', (chunk) => {
|
||||
bodyChunks.push(chunk)
|
||||
}).on('end', () => {
|
||||
const body = Buffer.concat(bodyChunks).toString()
|
||||
try {
|
||||
resolve({ data: JSON.parse(body) })
|
||||
} catch (e) {
|
||||
resolve({ error: e, options: options })
|
||||
}
|
||||
})
|
||||
})
|
||||
req.on('error', function (e) {
|
||||
resolve({ error: e, options: options })
|
||||
// reject(e)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function peerToString(peer: PeerInfo) {
|
||||
return `${peer.host}:${peer.port}`
|
||||
}
|
||||
|
||||
function isPeerSameAs(peer1: PeerInfo, peer2: PeerInfo) {
|
||||
return peer1.host === peer2.host && peer1.port === peer2.port
|
||||
}
|
||||
|
||||
export function getNewDataSince(subjects: Subject[], date: number): Subject[] {
|
||||
return subjects
|
||||
.map((subject) => {
|
||||
return {
|
||||
...subject,
|
||||
Questions: subject.Questions.filter((question) => {
|
||||
return (question.data.date || 0) > date
|
||||
}).map((question) => removeCacheFromQuestion(question)),
|
||||
}
|
||||
})
|
||||
.filter((subject) => subject.Questions.length !== 0)
|
||||
}
|
||||
|
||||
export function mergeSubjects(
|
||||
subjectsToMergeTo: Subject[],
|
||||
subjectsToMerge: Subject[],
|
||||
newSubjects: Subject[]
|
||||
): Subject[] {
|
||||
return [
|
||||
...subjectsToMergeTo.map((subj) => {
|
||||
const newSubjs = subjectsToMerge.filter(
|
||||
(subjRes) => subjRes.Name === subj.Name
|
||||
)
|
||||
|
||||
if (newSubjs) {
|
||||
const newQuestions = newSubjs.flatMap((subj) => {
|
||||
return subj.Questions
|
||||
})
|
||||
|
||||
return {
|
||||
...subj,
|
||||
Questions: [...subj.Questions, ...newQuestions],
|
||||
}
|
||||
} else {
|
||||
return subj
|
||||
}
|
||||
}),
|
||||
...newSubjects,
|
||||
]
|
||||
}
|
||||
|
||||
export function mergeQdbs(
|
||||
qdbToMergeTo: QuestionDb[],
|
||||
mergeResults: MergeResult[]
|
||||
): { mergedQuestionDbs: QuestionDb[]; changedQdbIndexes: number[] } {
|
||||
const changedQdbIndexes: number[] = []
|
||||
const mergedQuestionDbs = qdbToMergeTo.map((qdb) => {
|
||||
const qdbMergeResult = mergeResults.find(
|
||||
(mergeRes) => mergeRes.localQdbIndex === qdb.index
|
||||
)
|
||||
if (qdbMergeResult) {
|
||||
const mergedQdb = {
|
||||
...qdb,
|
||||
data: mergeSubjects(
|
||||
qdb.data,
|
||||
qdbMergeResult.newData,
|
||||
qdbMergeResult.newSubjects
|
||||
),
|
||||
}
|
||||
changedQdbIndexes.push(qdb.index)
|
||||
return mergedQdb
|
||||
} else {
|
||||
// unchanged
|
||||
return qdb
|
||||
}
|
||||
})
|
||||
return {
|
||||
mergedQuestionDbs: mergedQuestionDbs,
|
||||
changedQdbIndexes: changedQdbIndexes,
|
||||
}
|
||||
}
|
||||
|
||||
async function sendNewDataToWorkers(
|
||||
mergeResults: MergeResult[],
|
||||
newQuestionDbs: QuestionDb[]
|
||||
) {
|
||||
// FIXME: this might be slow, maybe make a new type of message for workers?
|
||||
const updatePromises: Promise<WorkerResult[]>[] = []
|
||||
let newQuestionCount = 0
|
||||
let newSubjectCount = 0
|
||||
let newQuestionDbCount = 0
|
||||
|
||||
mergeResults.forEach((mergeRes) => {
|
||||
if (mergeRes.e) {
|
||||
logger.Log(`There was an error processing the merge!`, 'redbg')
|
||||
console.error(mergeRes.e)
|
||||
return
|
||||
}
|
||||
|
||||
mergeRes.newData.forEach((subjectWithNewData) => {
|
||||
newQuestionCount += subjectWithNewData.Questions.length
|
||||
updatePromises.push(
|
||||
msgAllWorker({
|
||||
type: 'newQuestions',
|
||||
data: {
|
||||
subjName: subjectWithNewData.Name,
|
||||
qdbIndex: mergeRes.localQdbIndex,
|
||||
newQuestions: subjectWithNewData.Questions,
|
||||
},
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
newSubjectCount += mergeRes.newSubjects.length
|
||||
mergeRes.newSubjects.forEach((newSubject) => {
|
||||
newQuestionCount += newSubject.Questions.length
|
||||
updatePromises.push(
|
||||
msgAllWorker({
|
||||
type: 'newQuestions',
|
||||
data: {
|
||||
subjName: newSubject.Name,
|
||||
qdbIndex: mergeRes.localQdbIndex,
|
||||
newQuestions: newSubject.Questions,
|
||||
},
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
newQuestionDbCount += newQuestionDbs.length
|
||||
newQuestionDbs.forEach((newQdb) => {
|
||||
const { subjCount: sc, questionCount: qc } = countOfQdb(newQdb)
|
||||
newSubjectCount += sc
|
||||
newQuestionCount += qc
|
||||
msgAllWorker({
|
||||
data: newQdb,
|
||||
type: 'newdb',
|
||||
})
|
||||
})
|
||||
|
||||
await Promise.all(updatePromises)
|
||||
|
||||
return {
|
||||
newQuestionDbCount: newQuestionDbCount,
|
||||
newSubjectCount: newSubjectCount,
|
||||
newQuestionCount: newQuestionCount,
|
||||
}
|
||||
}
|
||||
|
||||
function writeNewData(
|
||||
newQuestionDbs: QuestionDb[],
|
||||
changedQuestionDbs: QuestionDb[]
|
||||
) {
|
||||
const qdbsToWrite = [...newQuestionDbs, ...changedQuestionDbs]
|
||||
qdbsToWrite.forEach((qdb) => {
|
||||
try {
|
||||
// FINALIZE: write to file
|
||||
// writeData(qdb.data, qdb.path)
|
||||
} catch (e) {
|
||||
logger.Log(`Error writing ${qdb.name} qdb to file!`, 'redbg')
|
||||
console.error(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
function updateLastSync(selfInfo: PeerInfo, newDate: number) {
|
||||
utils.WriteFile(
|
||||
JSON.stringify({ ...selfInfo, lastSync: newDate }, null, 2),
|
||||
selfInfoFile
|
||||
)
|
||||
}
|
||||
|
||||
function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) {
|
||||
return {
|
||||
...qdb,
|
||||
data: qdb.data.map((subj) => {
|
||||
return {
|
||||
...subj,
|
||||
Questions: subj.Questions.map((q) => {
|
||||
const initializedQuestion = q.cache ? q : createQuestion(q)
|
||||
initializedQuestion.data.source = peerToString(peer)
|
||||
return initializedQuestion
|
||||
}),
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// files
|
||||
const peersPath = 'data/p2p/'
|
||||
const peersFile = peersPath + '/peers.json'
|
||||
// writes it)
|
||||
const selfInfoFile = peersPath + '/selfInfo.json'
|
||||
const thirdPartyPeersFile = peersPath + '/thirdPartyPeers.json'
|
||||
const keyFile = peersPath + '/key' // key.pub key.priv
|
||||
|
||||
function setup(data: SubmoduleData): Submodule {
|
||||
const {
|
||||
app,
|
||||
userDB,
|
||||
publicdirs,
|
||||
moduleSpecificData: { questionDbs, setQuestionDbs, getQuestionDbs },
|
||||
// publicdirs,
|
||||
} = data
|
||||
|
||||
const publicDir = publicdirs[0]
|
||||
|
||||
// ---------------------------------------------------------------------------------------
|
||||
// SETUP
|
||||
// ---------------------------------------------------------------------------------------
|
||||
|
||||
// const publicDir = publicdirs[0]
|
||||
|
||||
if (!utils.FileExists(peersFile)) {
|
||||
logger.Log(
|
||||
`Warning: peers file was missing, so it was created`,
|
||||
'yellowbg'
|
||||
)
|
||||
utils.CreatePath(peersPath)
|
||||
utils.WriteFile('[]', peersFile)
|
||||
}
|
||||
|
||||
if (!utils.FileExists(selfInfoFile)) {
|
||||
logger.Log(
|
||||
'Self info file for p2p does not exist! P2P functionality will not be loaded',
|
||||
'redbg'
|
||||
)
|
||||
logger.Log(
|
||||
`File should be at: ${selfInfoFile} with the interface 'PeerInfo'`
|
||||
)
|
||||
throw new Error('p2p error')
|
||||
}
|
||||
|
||||
let publicKey: string
|
||||
let privateKey: string
|
||||
|
||||
if (
|
||||
!utils.FileExists(keyFile + '.priv') ||
|
||||
!utils.FileExists(keyFile + '.pub')
|
||||
) {
|
||||
createKeyPair().then(({ publicKey: pubk, privateKey: privk }) => {
|
||||
// at first start there won't be a keypair available until this finishes
|
||||
utils.WriteFile(pubk, keyFile + '.pub')
|
||||
utils.WriteFile(privk, keyFile + '.priv')
|
||||
|
||||
publicKey = pubk
|
||||
privateKey = privk
|
||||
})
|
||||
logger.Log(
|
||||
'There were no public / private keys for p2p functionality, created new ones',
|
||||
'yellowbg'
|
||||
)
|
||||
} else {
|
||||
publicKey = utils.ReadFile(keyFile + '.pub')
|
||||
privateKey = utils.ReadFile(keyFile + '.priv')
|
||||
// checking only here, because if it got generated in the other branch then it must be good
|
||||
if (!isKeypairValid(publicKey, privateKey)) {
|
||||
logger.Log('Loaded keypair is not valid!', 'redbg')
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: validate peers
|
||||
let peers: PeerInfo[] = utils.ReadJSON(peersFile)
|
||||
let selfInfo: PeerInfo = utils.ReadJSON(selfInfoFile)
|
||||
// self info file is not required to have the publicKey, as it is always added on init
|
||||
selfInfo.publicKey = publicKey
|
||||
|
||||
const filesToWatch = [
|
||||
{
|
||||
fname: peersFile,
|
||||
logMsg: 'Peers file updated',
|
||||
action: () => {
|
||||
peers = utils.ReadJSON(peersFile)
|
||||
},
|
||||
},
|
||||
{
|
||||
fname: selfInfoFile,
|
||||
logMsg: 'P2P self info file changed',
|
||||
action: () => {
|
||||
selfInfo = utils.ReadJSON(selfInfoFile)
|
||||
selfInfo.publicKey = publicKey
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
filesToWatch.forEach((ftw) => {
|
||||
if (utils.FileExists(ftw.fname)) {
|
||||
utils.WatchFile(ftw.fname, () => {
|
||||
logger.Log(ftw.logMsg)
|
||||
ftw.action()
|
||||
})
|
||||
ftw.action()
|
||||
} else {
|
||||
logger.Log(`File ${ftw.fname} does not exists to watch!`, 'redbg')
|
||||
}
|
||||
})
|
||||
|
||||
if (peers.length === 0) {
|
||||
logger.Log(
|
||||
`Warning: peers file is empty. You probably want to fill it`,
|
||||
'yellowbg'
|
||||
)
|
||||
} else {
|
||||
logger.Log('Loaded peers: ' + peers.length)
|
||||
peers.forEach((peer, i) => {
|
||||
logger.Log(`\t${i}\t"${peer.name}": ${peerToString(peer)}`)
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------
|
||||
// FUNCTIONS
|
||||
// ---------------------------------------------------------------------------------------
|
||||
|
||||
function getSelfInfo(includeQdbInfo?: boolean) {
|
||||
const result: RemotePeerInfo = {
|
||||
selfInfo: selfInfo,
|
||||
myPeers: peers,
|
||||
}
|
||||
|
||||
try {
|
||||
// FIXME: dont log if fails
|
||||
result.revision = child_process
|
||||
.execSync('git rev-parse HEAD', { cwd: __dirname })
|
||||
.toString()
|
||||
.trim()
|
||||
} catch (e) {
|
||||
result.revision = 'Failed to get revision'
|
||||
}
|
||||
|
||||
if (includeQdbInfo) {
|
||||
result.qdbInfo = getQuestionDbs().map((qdb) => {
|
||||
return {
|
||||
dbName: qdb.name,
|
||||
subjs: qdb.data.map((subj) => {
|
||||
return {
|
||||
name: subj.Name,
|
||||
count: subj.Questions.length,
|
||||
}
|
||||
}),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
function getNewUsersSince(since?: number) {
|
||||
const users = dbtools.runStatement(
|
||||
userDB,
|
||||
`SELECT *
|
||||
FROM users
|
||||
WHERE created >= ${since};`
|
||||
)
|
||||
return users
|
||||
}
|
||||
|
||||
function updateQdbForLocalUse(qdb: QuestionDb[]) {
|
||||
const availableIndexes = getAvailableQdbIndexes(
|
||||
getQuestionDbs(),
|
||||
qdb.length
|
||||
)
|
||||
return qdb.map((qdb, i) => {
|
||||
return {
|
||||
...qdb,
|
||||
index: availableIndexes[i],
|
||||
path: `${publicDir}questionDbs/${qdb.name}.json'`,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function getMergeResults(remoteQuestionDbs: QuestionDb[]) {
|
||||
const mergeJobs: Promise<any>[] = []
|
||||
const rawNewQuestionDbs: QuestionDb[] = []
|
||||
remoteQuestionDbs.forEach((remoteQdb) => {
|
||||
// TODO: warn on qdb differences like shouldSave
|
||||
const localQdb = getQuestionDbs().find(
|
||||
(lqdb) => lqdb.name === remoteQdb.name
|
||||
)
|
||||
|
||||
if (!localQdb) {
|
||||
rawNewQuestionDbs.push(remoteQdb)
|
||||
} else {
|
||||
mergeJobs.push(
|
||||
doALongTask({
|
||||
type: 'merge',
|
||||
data: {
|
||||
localQdbIndex: localQdb.index,
|
||||
remoteQdb: remoteQdb,
|
||||
},
|
||||
})
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
const mergeResults: MergeResult[] = await Promise.all(mergeJobs)
|
||||
|
||||
return {
|
||||
mergeResults: mergeResults,
|
||||
rawNewQuestionDbs: rawNewQuestionDbs,
|
||||
}
|
||||
}
|
||||
|
||||
async function syncData() {
|
||||
// TOTEST: try with 0 date to merge full dbs
|
||||
if (peers.length === 0) {
|
||||
logger.Log(
|
||||
`There are no peers specified in ${peersFile}, aborting sync`,
|
||||
'yellowbg'
|
||||
)
|
||||
return {
|
||||
msg: 'No peers specified, aborting',
|
||||
}
|
||||
}
|
||||
// FIXME: this might be blocking the main thread, but not sure how much
|
||||
logger.Log(
|
||||
`\tStarting data sync, getting new data from ${logger.C('green')}${
|
||||
peers.length
|
||||
}${logger.C()} peers`
|
||||
)
|
||||
|
||||
const lastSync = new Date('2023-01-01').getTime() // FINALIZE date: this is only for testing // selfInfo.lastSync
|
||||
logger.Log(
|
||||
`\tLast sync date: ${logger.C('blue')}${new Date(
|
||||
lastSync
|
||||
).toLocaleString()}${logger.C()}`
|
||||
)
|
||||
const syncStart = new Date().getTime()
|
||||
const requests = peers.map((peer) => {
|
||||
const lastSyncWithPeer = new Date('2023-01-01').getTime() // FINALIZE same as above // peer.lastSync || 0
|
||||
|
||||
logger.Log(
|
||||
`\tLast sync with ${logger.C('blue')}${peerToString(
|
||||
peer
|
||||
)}${logger.C()}: ${logger.C('blue')}${new Date(
|
||||
lastSyncWithPeer
|
||||
).toLocaleString()}${logger.C()}`
|
||||
)
|
||||
return new Promise<RequestResult<SyncDataRes & { peer: PeerInfo }>>(
|
||||
(resolve) => {
|
||||
get<SyncDataRes>({
|
||||
host: peer.host,
|
||||
port: peer.port,
|
||||
path: `/getnewdatasince?host=${selfInfo.host}${
|
||||
lastSync ? `&since=${lastSyncWithPeer}` : ''
|
||||
}`,
|
||||
}).then((res) => {
|
||||
resolve({ ...res, data: { ...res.data, peer: peer } })
|
||||
})
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
const allResults = await Promise.all(requests)
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// filtering, transforming, and counting data
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
allResults.forEach((res) => {
|
||||
if (res.error) {
|
||||
logger.Log(
|
||||
`\tError syncing with ${peerToString(res.data.peer)}: ${
|
||||
res.error.message
|
||||
}`,
|
||||
'red'
|
||||
)
|
||||
}
|
||||
})
|
||||
const resultDataWithoutErrors = allResults
|
||||
.filter((res) => !res.error)
|
||||
.map((res) => res.data)
|
||||
|
||||
if (resultDataWithoutErrors.length === 0) {
|
||||
logger.Log(
|
||||
`No peers returned data without error, aborting sync`,
|
||||
'redbg'
|
||||
)
|
||||
return {
|
||||
msg: 'No peers returned data without error, aborting sync',
|
||||
}
|
||||
}
|
||||
|
||||
const resultDataWithoutEmptyDbs = resultDataWithoutErrors.filter(
|
||||
(res) => {
|
||||
const qdbCount = res.questionDbs.length
|
||||
const { subjCount, questionCount } = countOfQdbs(
|
||||
res.questionDbs
|
||||
)
|
||||
|
||||
logger.Log(
|
||||
`\t"${logger.C('blue')}${peerToString(
|
||||
res.peer
|
||||
)}${logger.C()}" sent "${logger.C(
|
||||
'green'
|
||||
)}${qdbCount}${logger.C()}" question DB-s with "${logger.C(
|
||||
'green'
|
||||
)}${subjCount.toLocaleString()}${logger.C()}" subjects, and "${logger.C(
|
||||
'green'
|
||||
)}${questionCount.toLocaleString()}${logger.C()}" questions`
|
||||
)
|
||||
|
||||
return questionCount > 0
|
||||
}
|
||||
)
|
||||
|
||||
// TOTEST: even on new subjet and new qdb add! TEST
|
||||
// add new quesstions to db (QuestionData.source = true)
|
||||
const resultData = resultDataWithoutEmptyDbs.map((res) => {
|
||||
return {
|
||||
...res,
|
||||
questionDbs: res.questionDbs.map((qdb) => {
|
||||
return setupQuestionsForMerge(qdb, res.peer)
|
||||
}),
|
||||
}
|
||||
})
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// third party peers handling
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
const peersHosts = [...peers.map((peer) => peer.host), selfInfo.host]
|
||||
const thirdPartyPeers = resultData
|
||||
.map((res) => res.remoteInfo)
|
||||
.flatMap((x) => {
|
||||
return x.myPeers.filter(
|
||||
(recievedPeer) => !peersHosts.includes(recievedPeer.host)
|
||||
)
|
||||
})
|
||||
if (thirdPartyPeers.length > 0) {
|
||||
logger.Log(
|
||||
`\tPeers reported ${logger.C('green')}${
|
||||
thirdPartyPeers.length
|
||||
}${logger.C()} third party peer(s) not connected to this server.`
|
||||
)
|
||||
utils.WriteFile(
|
||||
JSON.stringify(thirdPartyPeers, null, 2),
|
||||
thirdPartyPeersFile
|
||||
)
|
||||
logger.Log(
|
||||
`\tSee ${logger.C(
|
||||
'blue'
|
||||
)}${thirdPartyPeersFile}${logger.C()} for details`
|
||||
)
|
||||
}
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// new users handlin TOTEST: test
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
let newUsers = 0
|
||||
const oldUserCount = dbtools.SelectAll(userDB, 'users').length
|
||||
try {
|
||||
resultData.forEach((res) => {
|
||||
if (res.encryptedUsers) {
|
||||
const decryptedUsers: User[] = JSON.parse(
|
||||
decrypt(privateKey, res.encryptedUsers)
|
||||
)
|
||||
let newUserCount = 0
|
||||
decryptedUsers.forEach((remoteUser) => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
const { id, ...remoteUserWithoutId } = remoteUser
|
||||
const localUser = dbtools.Select(userDB, 'users', {
|
||||
pw: remoteUser.pw,
|
||||
})
|
||||
if (!localUser) {
|
||||
// FIXME: users will not have consistend id across servers. This may be
|
||||
// harmless, will see
|
||||
dbtools.Insert(userDB, 'users', {
|
||||
...(remoteUserWithoutId as Omit<User, 'id'>),
|
||||
sourceHost: peerToString(res.peer),
|
||||
})
|
||||
newUserCount += 1
|
||||
}
|
||||
})
|
||||
if (newUserCount > 0) {
|
||||
newUsers += newUserCount
|
||||
logger.Log(
|
||||
`\tAdded ${logger.C(
|
||||
'green'
|
||||
)}${newUserCount}${logger.C()} users from "${logger.C(
|
||||
'blue'
|
||||
)}${peerToString(res.peer)}${logger.C()}"`
|
||||
)
|
||||
}
|
||||
}
|
||||
})
|
||||
} catch (e) {
|
||||
logger.Log(
|
||||
'\tError while trying to sync users: ' + e.message,
|
||||
'redbg'
|
||||
)
|
||||
console.error(e)
|
||||
}
|
||||
const newUserCount = dbtools.SelectAll(userDB, 'users').length
|
||||
|
||||
const hasNewData = resultData.length > 0
|
||||
if (!hasNewData) {
|
||||
logger.Log(
|
||||
`No peers returned any new questions. Sync successfully finished!`,
|
||||
'green'
|
||||
)
|
||||
updateLastSync(selfInfo, syncStart)
|
||||
return {
|
||||
msg: 'No peers returned any new questions',
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// backup
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
|
||||
const { subjCount: oldSubjCount, questionCount: oldQuestionCount } =
|
||||
countOfQdbs(getQuestionDbs())
|
||||
const oldQuestionDbCount = getQuestionDbs().length
|
||||
// TOTEST: test if backup wrks
|
||||
logger.Log('\tBacking up old data ...')
|
||||
backupData(getQuestionDbs())
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// adding questions to db
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
|
||||
let totalNewQuestions = 0
|
||||
let totalNewSubjects = 0
|
||||
let totalNewQdbs = 0
|
||||
for (let i = 0; i < resultData.length; i++) {
|
||||
const { questionDbs: remoteQuestionDbs, peer } = resultData[i]
|
||||
// FIXME: if remoteQuestionDbs contain multiple dbs with the same name, then the merging
|
||||
// process could get wonky. Ideally it should not contain, but we will see
|
||||
|
||||
const { rawNewQuestionDbs, mergeResults } = await getMergeResults(
|
||||
remoteQuestionDbs
|
||||
)
|
||||
|
||||
const newQuestionDbs = updateQdbForLocalUse(rawNewQuestionDbs)
|
||||
|
||||
const { mergedQuestionDbs, changedQdbIndexes } = mergeQdbs(
|
||||
getQuestionDbs(),
|
||||
mergeResults
|
||||
)
|
||||
// TOTEST: test muliple new question dbs from multiple sources
|
||||
// setting new index & path
|
||||
writeNewData(
|
||||
newQuestionDbs,
|
||||
getQuestionDbs().filter((qdb) => {
|
||||
return changedQdbIndexes.includes(qdb.index)
|
||||
})
|
||||
)
|
||||
|
||||
setQuestionDbs([...mergedQuestionDbs, ...newQuestionDbs])
|
||||
|
||||
const { newQuestionDbCount, newSubjectCount, newQuestionCount } =
|
||||
await sendNewDataToWorkers(mergeResults, newQuestionDbs)
|
||||
|
||||
if (newQuestionCount > 0) {
|
||||
logger.Log(
|
||||
`\tAdded ${logger.C(
|
||||
'green'
|
||||
)}${newQuestionDbCount.toLocaleString()}${logger.C()} new question DB-s, ${logger.C(
|
||||
'green'
|
||||
)}${newSubjectCount.toLocaleString()}${logger.C()} new subjects and ${logger.C(
|
||||
'green'
|
||||
)}${newQuestionCount.toLocaleString()}${logger.C()} new questions from "${logger.C(
|
||||
'blue'
|
||||
)}${peerToString(peer)}${logger.C()}"`
|
||||
)
|
||||
}
|
||||
|
||||
// Processing result data is successfull
|
||||
const newPeers = peers.map((x) => {
|
||||
if (isPeerSameAs(peer, x)) {
|
||||
return {
|
||||
...x,
|
||||
lastSync: syncStart,
|
||||
}
|
||||
} else {
|
||||
return x
|
||||
}
|
||||
})
|
||||
|
||||
utils.WriteFile(JSON.stringify(newPeers, null, 2), peersFile)
|
||||
|
||||
totalNewQdbs += newQuestionDbCount
|
||||
totalNewSubjects += newSubjectCount
|
||||
totalNewQuestions += newQuestionCount
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
updateLastSync(selfInfo, syncStart)
|
||||
|
||||
const newQdb = getQuestionDbs()
|
||||
const { subjCount: newSubjCount, questionCount: newQuestionCount } =
|
||||
countOfQdbs(newQdb)
|
||||
const newQuestionDbCount = newQdb.length
|
||||
|
||||
logger.logTable([
|
||||
['\t', 'Users', 'QDBs', 'Subjs', 'Questions'],
|
||||
[
|
||||
'Old\t',
|
||||
oldUserCount,
|
||||
oldQuestionDbCount,
|
||||
oldSubjCount,
|
||||
oldQuestionCount,
|
||||
],
|
||||
[
|
||||
'Added',
|
||||
newUsers,
|
||||
totalNewQdbs,
|
||||
totalNewSubjects,
|
||||
totalNewQuestions,
|
||||
],
|
||||
[
|
||||
'Final',
|
||||
newUserCount,
|
||||
newQuestionDbCount,
|
||||
newSubjCount,
|
||||
newQuestionCount,
|
||||
],
|
||||
])
|
||||
|
||||
logger.Log(
|
||||
`Question DB-s written! Sync successfully finished!`,
|
||||
'green'
|
||||
)
|
||||
|
||||
return {
|
||||
old: {
|
||||
oldUserCount: oldUserCount,
|
||||
oldQuestionDbCount: oldQuestionDbCount,
|
||||
oldSubjCount: oldSubjCount,
|
||||
oldQuestionCount: oldQuestionCount,
|
||||
},
|
||||
added: {
|
||||
totalNewQdbs: totalNewQdbs,
|
||||
totalNewSubjects: totalNewSubjects,
|
||||
totalNewQuestions: totalNewQuestions,
|
||||
},
|
||||
final: {
|
||||
newUserCount: newUserCount,
|
||||
newQuestionDbCount: newQuestionDbCount,
|
||||
newSubjCount: newSubjCount,
|
||||
newQuestionCount: newQuestionCount,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------
|
||||
// APP SETUP
|
||||
// ---------------------------------------------------------------------------------------
|
||||
app.get('/p2pinfo', (req: Request, res: Response<RemotePeerInfo>) => {
|
||||
logger.LogReq(req)
|
||||
res.json(getSelfInfo(true))
|
||||
})
|
||||
|
||||
app.get('/getnewdatasince', (req: Request, res: Response<SyncDataRes>) => {
|
||||
// TODO: hash question db to see if different?
|
||||
logger.LogReq(req)
|
||||
const since = +req.query.since
|
||||
const remoteHost = req.query.host
|
||||
|
||||
const questionDbsWithNewQuestions = Number.isNaN(since)
|
||||
? questionDbs
|
||||
: questionDbs
|
||||
.map((qdb) => {
|
||||
return {
|
||||
...qdb,
|
||||
data: getNewDataSince(qdb.data, since),
|
||||
}
|
||||
})
|
||||
.filter((qdb) => {
|
||||
const { questionCount: questionCount } = countOfQdb(qdb)
|
||||
return questionCount > 0
|
||||
})
|
||||
|
||||
const { subjCount: subjects, questionCount: questions } = countOfQdbs(
|
||||
questionDbsWithNewQuestions
|
||||
)
|
||||
|
||||
const result: SyncDataRes = {
|
||||
questionDbs: questionDbsWithNewQuestions,
|
||||
count: {
|
||||
qdbs: questionDbsWithNewQuestions.length,
|
||||
subjects: subjects,
|
||||
questions: questions,
|
||||
},
|
||||
remoteInfo: getSelfInfo(),
|
||||
}
|
||||
|
||||
if (remoteHost) {
|
||||
const remoteHostInfo = peers.find((peer) => {
|
||||
return peer.host === remoteHost
|
||||
})
|
||||
const remotePublicKey = remoteHostInfo?.publicKey
|
||||
if (remotePublicKey) {
|
||||
// FIXME: sign data?
|
||||
const newUsers = getNewUsersSince(since)
|
||||
result.encryptedUsers = encrypt(
|
||||
remotePublicKey,
|
||||
JSON.stringify(newUsers)
|
||||
)
|
||||
} else if (remoteHostInfo) {
|
||||
logger.Log(
|
||||
`Warning: ${remoteHostInfo.host}:${remoteHostInfo.port} has no publick key saved!`,
|
||||
'yellowbg'
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
res.json(result)
|
||||
})
|
||||
|
||||
app.get('/syncp2pdata', (req: Request, res: Response) => {
|
||||
logger.LogReq(req)
|
||||
// FINALIZE: uncomment
|
||||
// const user = req.session.user
|
||||
// if (user.id !== 1) {
|
||||
// res.json({
|
||||
// status: 'error',
|
||||
// msg: 'only user 1 can call this EP',
|
||||
// })
|
||||
// return
|
||||
// }
|
||||
|
||||
syncData()
|
||||
.then((syncResult) => {
|
||||
res.json({
|
||||
msg: 'sync successfull',
|
||||
...syncResult,
|
||||
})
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error(e)
|
||||
res.json({
|
||||
error: e,
|
||||
msg: e.message,
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
logger.Log('P2P functionality set up. Peers: ' + peers.length, 'blue')
|
||||
|
||||
return {}
|
||||
}
|
||||
|
||||
export default {
|
||||
setup: setup,
|
||||
}
|
|
@ -44,24 +44,21 @@ import {
|
|||
Result,
|
||||
backupData,
|
||||
shouldSearchDataFile,
|
||||
loadJSON,
|
||||
writeData,
|
||||
editDb,
|
||||
RecievedData,
|
||||
} from '../../../utils/actions'
|
||||
import {
|
||||
dataToString,
|
||||
getSubjNameWithoutYear,
|
||||
WorkerResult,
|
||||
SearchResultQuestion,
|
||||
// compareQuestionObj,
|
||||
} from '../../../utils/classes'
|
||||
import {
|
||||
doALongTask,
|
||||
msgAllWorker,
|
||||
initWorkerPool,
|
||||
} from '../../../utils/workerPool'
|
||||
import { doALongTask, msgAllWorker } from '../../../utils/workerPool'
|
||||
import dbtools from '../../../utils/dbtools'
|
||||
import {
|
||||
dataToString,
|
||||
getSubjNameWithoutYear,
|
||||
SearchResultQuestion,
|
||||
} from '../../../utils/qdbUtils'
|
||||
|
||||
interface SavedQuestionData {
|
||||
fname: string
|
||||
|
@ -469,11 +466,15 @@ function getNewQdb(
|
|||
}
|
||||
|
||||
function setup(data: SubmoduleData): Submodule {
|
||||
const { app, userDB, /* url */ publicdirs /* moduleSpecificData */ } = data
|
||||
const {
|
||||
app,
|
||||
userDB,
|
||||
/* url */ publicdirs,
|
||||
moduleSpecificData: { questionDbs: questionDbs, dbsFile: dbsFile },
|
||||
} = data
|
||||
|
||||
const publicDir = publicdirs[0]
|
||||
const motdFile = publicDir + 'motd'
|
||||
const dbsFile = publicDir + 'questionDbs.json'
|
||||
const savedQuestionsDir = publicDir + 'savedQuestions'
|
||||
|
||||
let version = LoadVersion()
|
||||
|
@ -481,10 +482,6 @@ function setup(data: SubmoduleData): Submodule {
|
|||
let motd = LoadMOTD(motdFile)
|
||||
let testUsers: number[] = LoadTestUsers()
|
||||
|
||||
const dataFiles: Array<DataFile> = utils.ReadJSON(dbsFile)
|
||||
const questionDbs: Array<QuestionDb> = loadJSON(dataFiles, publicDir)
|
||||
initWorkerPool(() => questionDbs)
|
||||
|
||||
const filesToWatch = [
|
||||
{
|
||||
fname: motdFile,
|
||||
|
|
|
@ -320,7 +320,7 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}
|
||||
)
|
||||
|
||||
function getDayDiff(dateString: string | Date) {
|
||||
function getDayDiff(dateString: string | Date | number) {
|
||||
const msdiff = new Date().getTime() - new Date(dateString).getTime()
|
||||
return Math.floor(msdiff / (1000 * 3600 * 24))
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue