mirror of
https://gitlab.com/MrFry/mrfrys-node-server
synced 2025-04-01 20:24:18 +02:00
1400 lines
45 KiB
TypeScript
1400 lines
45 KiB
TypeScript
/* ----------------------------------------------------------------------------
|
|
|
|
Question Server
|
|
GitLab: <https://gitlab.com/MrFry/mrfrys-node-server>
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
------------------------------------------------------------------------- */
|
|
|
|
import { Response } from 'express'
|
|
|
|
import logger from '../../../utils/logger'
|
|
import {
|
|
Request,
|
|
SubmoduleData,
|
|
Submodule,
|
|
PeerInfo,
|
|
Subject,
|
|
QuestionDb,
|
|
User,
|
|
DataFile,
|
|
} from '../../../types/basicTypes'
|
|
import utils from '../../../utils/utils'
|
|
import { backupData, writeData } from '../../../utils/actions'
|
|
import dbtools from '../../../utils/dbtools'
|
|
import {
|
|
createKeyPair,
|
|
decrypt,
|
|
encrypt,
|
|
isKeypairValid,
|
|
} from '../../../utils/encryption'
|
|
import {
|
|
countOfQdb,
|
|
countOfQdbs,
|
|
createQuestion,
|
|
getAvailableQdbIndexes,
|
|
removeCacheFromQuestion,
|
|
} from '../../../utils/qdbUtils'
|
|
import { files, paths, readAndValidateFile } from '../../../utils/files'
|
|
import { GetResult, get } from '../../../utils/networkUtils'
|
|
import {
|
|
msgAllWorker,
|
|
queueWork,
|
|
setPendingJobsAlertCount,
|
|
} from '../../../worker/workerPool'
|
|
import { WorkerResult } from '../../../worker/worker'
|
|
import {
|
|
loginToPeer,
|
|
peerToString,
|
|
updatePeersFile,
|
|
} from '../../../utils/p2putils'
|
|
|
|
interface MergeResult {
|
|
newData: Subject[]
|
|
newSubjects: Subject[]
|
|
localQdbIndex: number
|
|
e: Error
|
|
}
|
|
|
|
interface RemotePeerInfo {
|
|
selfInfo: PeerInfo
|
|
myPeers: PeerInfo[]
|
|
serverRevision?: string
|
|
scriptRevision?: string
|
|
qminingPageRevision?: string
|
|
dataEditorRevision?: string
|
|
serverLastCommitDate?: number
|
|
scriptLastCommitDate?: number
|
|
qminingPageLastCommitDate?: number
|
|
dataEditorLastCommitDate?: number
|
|
serverBuildTime?: number
|
|
qminingPageBuildTime?: number
|
|
dataEditorBuildTime?: number
|
|
scriptVersion?: string
|
|
userCount?: number
|
|
qdbInfo?: {
|
|
questionDbCount: number
|
|
subjectCount: number
|
|
questionCount: number
|
|
}
|
|
}
|
|
|
|
interface SyncResult {
|
|
old?: { [key: string]: number }
|
|
added?: { [key: string]: number }
|
|
final?: { [key: string]: number }
|
|
msg?: string
|
|
}
|
|
|
|
interface SyncDataResBase {
|
|
result?: string
|
|
remoteInfo?: RemotePeerInfo
|
|
}
|
|
|
|
interface UserSyncDataRes extends SyncDataResBase {
|
|
encryptedUsers?: string
|
|
}
|
|
|
|
interface QuestionSyncDataRes extends SyncDataResBase {
|
|
questionDbs?: QuestionDb[]
|
|
count?: {
|
|
qdbs: number
|
|
subjects: number
|
|
questions: number
|
|
}
|
|
}
|
|
|
|
interface NewDataResult {
|
|
peer: PeerInfo
|
|
result?: {
|
|
questions?: GetResult<QuestionSyncDataRes>
|
|
users?: GetResult<UserSyncDataRes>
|
|
}
|
|
error?: Error
|
|
}
|
|
|
|
function updateThirdPartyPeers(
|
|
newVal: Omit<PeerInfo, 'publicKey' | 'name' | 'contact'>[]
|
|
) {
|
|
const prevVal = utils.FileExists(paths.thirdPartyPeersFile)
|
|
? utils.ReadJSON<PeerInfo[]>(paths.thirdPartyPeersFile)
|
|
: []
|
|
|
|
const dataToWrite = newVal.reduce((acc, peer) => {
|
|
const isIncluded = acc.find((x) => {
|
|
return peerToString(x) === peerToString(peer)
|
|
})
|
|
if (!isIncluded) {
|
|
return [...acc, peer]
|
|
}
|
|
return acc
|
|
}, prevVal)
|
|
|
|
utils.WriteFile(
|
|
JSON.stringify(dataToWrite, null, 2),
|
|
paths.thirdPartyPeersFile
|
|
)
|
|
}
|
|
|
|
export function getNewDataSince(subjects: Subject[], date: number): Subject[] {
|
|
return subjects
|
|
.map((subject) => {
|
|
return {
|
|
...subject,
|
|
Questions: subject.Questions.filter((question) => {
|
|
return (question.data.date || 0) >= date
|
|
}).map((question) => removeCacheFromQuestion(question)),
|
|
}
|
|
})
|
|
.filter((subject) => subject.Questions.length !== 0)
|
|
}
|
|
|
|
export function mergeSubjects(
|
|
subjectsToMergeTo: Subject[],
|
|
subjectsToMerge: Subject[],
|
|
newSubjects: Subject[]
|
|
): Subject[] {
|
|
return [
|
|
...subjectsToMergeTo.map((subj) => {
|
|
const newSubjs = subjectsToMerge.filter(
|
|
(subjRes) => subjRes.Name === subj.Name
|
|
)
|
|
|
|
if (newSubjs) {
|
|
const newQuestions = newSubjs.flatMap((subj) => {
|
|
return subj.Questions
|
|
})
|
|
|
|
return {
|
|
...subj,
|
|
Questions: [...subj.Questions, ...newQuestions],
|
|
}
|
|
} else {
|
|
return subj
|
|
}
|
|
}),
|
|
...newSubjects,
|
|
]
|
|
}
|
|
|
|
export function mergeQdbs(
|
|
qdbToMergeTo: QuestionDb[],
|
|
mergeResults: MergeResult[]
|
|
): { mergedQuestionDbs: QuestionDb[]; changedQdbIndexes: number[] } {
|
|
const changedQdbIndexes: number[] = []
|
|
const mergedQuestionDbs = qdbToMergeTo.map((qdb) => {
|
|
const qdbMergeResult = mergeResults.find(
|
|
(mergeRes) => mergeRes.localQdbIndex === qdb.index
|
|
)
|
|
if (
|
|
qdbMergeResult &&
|
|
(qdbMergeResult.newData.length > 0 ||
|
|
qdbMergeResult.newSubjects.length > 0)
|
|
) {
|
|
const mergedQdb = {
|
|
...qdb,
|
|
data: mergeSubjects(
|
|
qdb.data,
|
|
qdbMergeResult.newData,
|
|
qdbMergeResult.newSubjects
|
|
),
|
|
}
|
|
changedQdbIndexes.push(qdb.index)
|
|
return mergedQdb
|
|
} else {
|
|
// unchanged
|
|
return qdb
|
|
}
|
|
})
|
|
return {
|
|
mergedQuestionDbs: mergedQuestionDbs,
|
|
changedQdbIndexes: changedQdbIndexes,
|
|
}
|
|
}
|
|
|
|
async function sendNewDataToWorkers(
|
|
mergeResults: MergeResult[],
|
|
newQuestionDbs: QuestionDb[]
|
|
) {
|
|
// FIXME: this might be slow, maybe make a new type of message for workers?
|
|
const updatePromises: Promise<WorkerResult[]>[] = []
|
|
let newQuestionCount = 0
|
|
let newSubjectCount = 0
|
|
let newQuestionDbCount = 0
|
|
|
|
mergeResults.forEach((mergeRes) => {
|
|
if (mergeRes.e) {
|
|
logger.Log(`There was an error processing the merge!`, 'redbg')
|
|
console.error(mergeRes.e)
|
|
return
|
|
}
|
|
|
|
mergeRes.newData.forEach((subjectWithNewData) => {
|
|
newQuestionCount += subjectWithNewData.Questions.length
|
|
updatePromises.push(
|
|
msgAllWorker({
|
|
type: 'newQuestions',
|
|
data: {
|
|
subjName: subjectWithNewData.Name,
|
|
qdbIndex: mergeRes.localQdbIndex,
|
|
newQuestions: subjectWithNewData.Questions,
|
|
},
|
|
})
|
|
)
|
|
})
|
|
|
|
newSubjectCount += mergeRes.newSubjects.length
|
|
mergeRes.newSubjects.forEach((newSubject) => {
|
|
newQuestionCount += newSubject.Questions.length
|
|
updatePromises.push(
|
|
msgAllWorker({
|
|
type: 'newQuestions',
|
|
data: {
|
|
subjName: newSubject.Name,
|
|
qdbIndex: mergeRes.localQdbIndex,
|
|
newQuestions: newSubject.Questions,
|
|
},
|
|
})
|
|
)
|
|
})
|
|
})
|
|
newQuestionDbCount += newQuestionDbs.length
|
|
newQuestionDbs.forEach((newQdb) => {
|
|
const { subjCount: sc, questionCount: qc } = countOfQdb(newQdb)
|
|
newSubjectCount += sc
|
|
newQuestionCount += qc
|
|
msgAllWorker({
|
|
data: newQdb,
|
|
type: 'newdb',
|
|
})
|
|
})
|
|
|
|
await Promise.all(updatePromises)
|
|
|
|
return {
|
|
newQuestionDbCount: newQuestionDbCount,
|
|
newSubjectCount: newSubjectCount,
|
|
newQuestionCount: newQuestionCount,
|
|
}
|
|
}
|
|
|
|
function writeNewData(
|
|
newQuestionDbs: QuestionDb[],
|
|
changedQuestionDbs: QuestionDb[],
|
|
dbsFilePath: string,
|
|
publicDir: string
|
|
) {
|
|
const qdbsToWrite = [...changedQuestionDbs, ...newQuestionDbs]
|
|
const existingQdbs = utils.ReadJSON<DataFile[]>(dbsFilePath)
|
|
|
|
const qdbDataToWrite = qdbsToWrite
|
|
.reduce((acc, qdb) => {
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
|
const { data, index, ...qdbData } = qdb
|
|
const existingQdbData = acc.find((data) => {
|
|
return data.name === qdbData.name
|
|
})
|
|
if (!existingQdbData) {
|
|
return [...acc, qdbData]
|
|
} else {
|
|
return acc
|
|
}
|
|
}, existingQdbs)
|
|
.map((qdb) => {
|
|
if (qdb.path.includes(publicDir)) {
|
|
return {
|
|
...qdb,
|
|
path: qdb.path.replace(publicDir, ''),
|
|
}
|
|
} else {
|
|
return qdb
|
|
}
|
|
})
|
|
|
|
utils.WriteFile(JSON.stringify(qdbDataToWrite, null, 2), dbsFilePath)
|
|
qdbsToWrite.forEach((qdb) => {
|
|
try {
|
|
writeData(qdb.data, qdb.path)
|
|
} catch (e) {
|
|
logger.Log(`Error writing ${qdb.name} qdb to file!`, 'redbg')
|
|
console.error(e)
|
|
}
|
|
})
|
|
}
|
|
|
|
function updateLastSync(selfInfo: PeerInfo, newDate: number) {
|
|
utils.WriteFile(
|
|
JSON.stringify({ ...selfInfo, lastSync: newDate }, null, 2),
|
|
paths.selfInfoFile
|
|
)
|
|
}
|
|
|
|
function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) {
|
|
return {
|
|
...qdb,
|
|
data: qdb.data.map((subj) => {
|
|
return {
|
|
...subj,
|
|
Questions: subj.Questions.map((q) => {
|
|
const initializedQuestion = q.cache ? q : createQuestion(q)
|
|
initializedQuestion.data.source = peerToString(peer)
|
|
return initializedQuestion
|
|
}),
|
|
}
|
|
}),
|
|
}
|
|
}
|
|
|
|
async function authAndGetNewData({
|
|
peer,
|
|
selfInfo,
|
|
allTime,
|
|
shouldSync,
|
|
}: {
|
|
peer: PeerInfo
|
|
selfInfo: PeerInfo
|
|
allTime?: boolean
|
|
shouldSync: {
|
|
questions: boolean
|
|
users: boolean
|
|
}
|
|
}): Promise<NewDataResult> {
|
|
try {
|
|
const syncAll =
|
|
!shouldSync ||
|
|
Object.values(shouldSync).filter((x) => x).length === 0
|
|
let sessionCookie = peer.sessionCookie
|
|
|
|
const login = async () => {
|
|
const loginResult = await loginToPeer(peer)
|
|
if (typeof loginResult === 'string') {
|
|
sessionCookie = loginResult
|
|
updatePeersFile(peer, { sessionCookie: loginResult })
|
|
} else {
|
|
throw {
|
|
error: loginResult,
|
|
data: {
|
|
peer: peer,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!sessionCookie) {
|
|
await login()
|
|
}
|
|
|
|
const getData = async <T>(path: string) => {
|
|
return get<T>(
|
|
{
|
|
headers: {
|
|
cookie: `sessionID=${sessionCookie}`,
|
|
},
|
|
host: peer.host,
|
|
port: peer.port,
|
|
path: path,
|
|
},
|
|
peer.http
|
|
)
|
|
}
|
|
|
|
let result: NewDataResult['result'] = {}
|
|
|
|
const setResult = async () => {
|
|
if (shouldSync.questions || syncAll) {
|
|
result.questions = await getData<QuestionSyncDataRes>(
|
|
`/api/getnewdatasince?host=${encodeURIComponent(
|
|
peerToString(selfInfo)
|
|
)}${
|
|
peer.lastSync && !allTime
|
|
? `&since=${peer.lastSync}`
|
|
: ''
|
|
}`
|
|
)
|
|
}
|
|
|
|
if (shouldSync.users || syncAll) {
|
|
result.users = await getData<QuestionSyncDataRes>(
|
|
`/api/getnewuserssince?host=${encodeURIComponent(
|
|
peerToString(selfInfo)
|
|
)}${
|
|
peer.lastUsersSync && !allTime
|
|
? `&since=${peer.lastUsersSync}`
|
|
: ''
|
|
}`
|
|
)
|
|
}
|
|
}
|
|
|
|
await setResult()
|
|
|
|
const hasNoUser = Object.values(result).find((res) => {
|
|
return res.data?.result === 'nouser'
|
|
})
|
|
if (hasNoUser) {
|
|
await login()
|
|
result = {}
|
|
await setResult()
|
|
}
|
|
|
|
return { result: result, peer: peer }
|
|
} catch (e) {
|
|
console.error(e)
|
|
return { error: e, peer: peer }
|
|
}
|
|
}
|
|
|
|
function setup(data: SubmoduleData): Submodule {
|
|
const {
|
|
app,
|
|
userDB,
|
|
publicdirs,
|
|
moduleSpecificData: { setQuestionDbs, getQuestionDbs, dbsFile },
|
|
} = data
|
|
|
|
const publicDir = publicdirs[0]
|
|
let syncInProgress = false
|
|
|
|
// ---------------------------------------------------------------------------------------
|
|
// SETUP
|
|
// ---------------------------------------------------------------------------------------
|
|
|
|
let publicKey: string
|
|
let privateKey: string
|
|
|
|
if (
|
|
!utils.FileExists(paths.keyFile + '.priv') ||
|
|
!utils.FileExists(paths.keyFile + '.pub')
|
|
) {
|
|
createKeyPair().then(({ publicKey: pubk, privateKey: privk }) => {
|
|
// at first start there won't be a keypair available until this finishes
|
|
utils.WriteFile(pubk, paths.keyFile + '.pub')
|
|
utils.WriteFile(privk, paths.keyFile + '.priv')
|
|
|
|
publicKey = pubk
|
|
privateKey = privk
|
|
})
|
|
logger.Log(
|
|
'There were no public / private keys for p2p functionality, created new ones',
|
|
'yellowbg'
|
|
)
|
|
} else {
|
|
publicKey = utils.ReadFile(paths.keyFile + '.pub')
|
|
privateKey = utils.ReadFile(paths.keyFile + '.priv')
|
|
// checking only here, because if it got generated in the other branch then it must be good
|
|
if (!isKeypairValid(publicKey, privateKey)) {
|
|
logger.Log('Loaded keypair is not valid!', 'redbg')
|
|
}
|
|
}
|
|
|
|
let peers: PeerInfo[] = utils.ReadJSON(paths.peersFile)
|
|
let selfInfo: PeerInfo = utils.ReadJSON(paths.selfInfoFile)
|
|
selfInfo.publicKey = publicKey
|
|
|
|
const filesToWatch = [
|
|
{
|
|
fname: paths.peersFile,
|
|
logMsg: 'Peers file updated',
|
|
action: () => {
|
|
const newVal = readAndValidateFile<PeerInfo[]>(files.peersFile)
|
|
if (newVal) {
|
|
peers = newVal
|
|
}
|
|
},
|
|
},
|
|
{
|
|
fname: paths.selfInfoFile,
|
|
logMsg: 'P2P self info file changed',
|
|
action: () => {
|
|
const newVal = readAndValidateFile<PeerInfo>(files.selfInfoFile)
|
|
if (newVal) {
|
|
selfInfo = newVal
|
|
}
|
|
},
|
|
},
|
|
]
|
|
|
|
filesToWatch.forEach((ftw) => {
|
|
if (utils.FileExists(ftw.fname)) {
|
|
utils.WatchFile(ftw.fname, () => {
|
|
logger.Log(ftw.logMsg)
|
|
ftw.action()
|
|
})
|
|
ftw.action()
|
|
} else {
|
|
logger.Log(`File ${ftw.fname} does not exists to watch!`, 'redbg')
|
|
}
|
|
})
|
|
|
|
if (peers.length === 0) {
|
|
logger.Log(
|
|
`Warning: peers file is empty. You probably want to fill it`,
|
|
'yellowbg'
|
|
)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------------------
|
|
// FUNCTIONS
|
|
// ---------------------------------------------------------------------------------------
|
|
|
|
function getSelfInfo(includeVerboseInfo?: boolean) {
|
|
const result: RemotePeerInfo = {
|
|
selfInfo: { ...selfInfo, publicKey: publicKey },
|
|
myPeers: peers.map((peer) => {
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
|
const { pw, sessionCookie, ...restOfPeer } = peer
|
|
return restOfPeer
|
|
}),
|
|
}
|
|
|
|
if (includeVerboseInfo) {
|
|
const serverRevision = utils.getGitInfo(__dirname)
|
|
result.serverRevision = serverRevision.revision
|
|
result.serverLastCommitDate = serverRevision.lastCommitDate
|
|
|
|
const scriptRevision = utils.getGitInfo(
|
|
paths.moodleTestUserscriptDir
|
|
)
|
|
result.scriptRevision = scriptRevision.revision
|
|
result.scriptLastCommitDate = scriptRevision.lastCommitDate
|
|
|
|
const qminingPageRevision = utils.getGitInfo(paths.qminingPageDir)
|
|
result.qminingPageRevision = qminingPageRevision.revision
|
|
result.qminingPageLastCommitDate =
|
|
qminingPageRevision.lastCommitDate
|
|
|
|
const dataEditorRevision = utils.getGitInfo(paths.dataEditorPageDir)
|
|
result.dataEditorRevision = dataEditorRevision.revision
|
|
result.dataEditorLastCommitDate = dataEditorRevision.lastCommitDate
|
|
|
|
result.qminingPageBuildTime = utils
|
|
.statFile(paths.qminingIndexPath)
|
|
?.mtime.getTime()
|
|
result.serverBuildTime = utils
|
|
.statFile(paths.serverPath)
|
|
?.mtime.getTime()
|
|
result.dataEditorBuildTime = utils
|
|
.statFile(paths.dataEditorIndexPath)
|
|
?.mtime.getTime()
|
|
result.scriptVersion = utils.getScriptVersion()
|
|
result.userCount = dbtools.TableInfo(userDB, 'users').dataCount
|
|
|
|
const questionDbCount = getQuestionDbs().length
|
|
const { subjCount, questionCount } = countOfQdbs(getQuestionDbs())
|
|
result.qdbInfo = {
|
|
questionDbCount: questionDbCount,
|
|
subjectCount: subjCount,
|
|
questionCount: questionCount,
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
function getNewUsersSince(since: number) {
|
|
const users: User[] = dbtools.runStatement(
|
|
userDB,
|
|
`SELECT *
|
|
FROM users
|
|
WHERE created >= ${since}
|
|
AND id != 1
|
|
AND isAdmin is null
|
|
;`
|
|
)
|
|
return users
|
|
}
|
|
|
|
function updateQdbForLocalUse(qdb: QuestionDb[]) {
|
|
const availableIndexes = getAvailableQdbIndexes(
|
|
getQuestionDbs(),
|
|
qdb.length
|
|
)
|
|
return qdb.map((qdb, i) => {
|
|
return {
|
|
...qdb,
|
|
index: availableIndexes[i],
|
|
path: `${publicDir}questionDbs/${qdb.name}.json`,
|
|
}
|
|
})
|
|
}
|
|
|
|
async function getMergeResults(remoteQuestionDbs: QuestionDb[]) {
|
|
const mergeJobs: Promise<any>[] = []
|
|
const rawNewQuestionDbs: QuestionDb[] = []
|
|
remoteQuestionDbs.forEach((remoteQdb) => {
|
|
const localQdb = getQuestionDbs().find(
|
|
(lqdb) => lqdb.name === remoteQdb.name
|
|
)
|
|
|
|
if (!localQdb) {
|
|
rawNewQuestionDbs.push(remoteQdb)
|
|
} else {
|
|
mergeJobs.push(
|
|
queueWork({
|
|
type: 'merge',
|
|
data: {
|
|
localQdbIndex: localQdb.index,
|
|
remoteQdb: remoteQdb,
|
|
},
|
|
})
|
|
)
|
|
}
|
|
})
|
|
|
|
const mergeResults: MergeResult[] = await Promise.all(mergeJobs)
|
|
|
|
return {
|
|
mergeResults: mergeResults,
|
|
rawNewQuestionDbs: rawNewQuestionDbs,
|
|
}
|
|
}
|
|
|
|
async function syncData({
|
|
shouldSync,
|
|
allTime,
|
|
}: {
|
|
shouldSync: {
|
|
questions: boolean
|
|
users: boolean
|
|
}
|
|
allTime: boolean
|
|
}) {
|
|
if (peers.length === 0) {
|
|
logger.Log(
|
|
`There are no peers specified in ${paths.peersFile}, aborting sync`,
|
|
'yellowbg'
|
|
)
|
|
return {
|
|
msg: 'No peers specified, aborting',
|
|
}
|
|
}
|
|
// FIXME: this might be blocking the main thread, but not sure how much
|
|
logger.Log(
|
|
`\tStarting data sync, getting new data from ${logger.C('green')}${
|
|
peers.length
|
|
}${logger.C()} peers`
|
|
)
|
|
|
|
const syncAll =
|
|
!shouldSync ||
|
|
Object.values(shouldSync).filter((x) => x).length === 0
|
|
|
|
logger.Log(
|
|
`\tSyncing: ${
|
|
syncAll ? 'everything' : Object.keys(syncAll).join(', ')
|
|
}`,
|
|
'green'
|
|
)
|
|
|
|
if (allTime) {
|
|
logger.Log(`\tSyncing since all time!`, 'yellowbg')
|
|
}
|
|
|
|
const lastSync = selfInfo.lastSync
|
|
logger.Log(
|
|
`\tLast sync date: ${logger.C('blue')}${
|
|
lastSync ? new Date(lastSync).toLocaleString() : 'never'
|
|
}${logger.C()}`
|
|
)
|
|
const syncStart = new Date().getTime()
|
|
const lastSyncInfos = peers.map((peer) => {
|
|
return [
|
|
peerToString(peer),
|
|
peer.lastSync
|
|
? new Date(peer.lastSync).toLocaleString()
|
|
: 'never',
|
|
]
|
|
})
|
|
logger.Log(`\tLast sync with peers:`)
|
|
logger.logTable([['', 'Date'], ...lastSyncInfos], {
|
|
colWidth: [15],
|
|
rowPrefix: '\t',
|
|
})
|
|
|
|
const requests = peers.map((peer) => {
|
|
return authAndGetNewData({
|
|
peer: peer,
|
|
selfInfo: selfInfo,
|
|
allTime: allTime,
|
|
shouldSync: shouldSync,
|
|
})
|
|
})
|
|
|
|
const allResults = await Promise.all(requests)
|
|
|
|
// -------------------------------------------------------------------------------------------------------
|
|
// filtering, transforming, and counting responses
|
|
// -------------------------------------------------------------------------------------------------------
|
|
allResults.forEach((res) => {
|
|
const errors = res?.error
|
|
? [{ key: 'all', error: res.error }]
|
|
: Object.entries(res.result)
|
|
.map(([key, x]) =>
|
|
x.error ? { error: x.error, key: key } : null
|
|
)
|
|
.filter((x) => !!x)
|
|
|
|
if (errors.length > 0) {
|
|
logger.Log(
|
|
`\tError syncing with ${peerToString(res.peer)}`,
|
|
'red'
|
|
)
|
|
errors.forEach((e) => {
|
|
logger.Log(`\t${e.key}: ${e.error.message}`)
|
|
})
|
|
}
|
|
})
|
|
const resultDataWithoutErrors: {
|
|
questions?: QuestionSyncDataRes & { peer: PeerInfo }
|
|
users?: UserSyncDataRes & { peer: PeerInfo }
|
|
}[] = allResults.reduce((acc, resData) => {
|
|
const resDataWithoutErrors = Object.entries(resData.result).reduce(
|
|
(acc, [key, x]) => {
|
|
if (!x.error) {
|
|
acc[key] = { ...x.data, peer: resData.peer }
|
|
}
|
|
return acc
|
|
},
|
|
{}
|
|
)
|
|
|
|
if (Object.keys(resDataWithoutErrors).length > 0) {
|
|
return [...acc, resDataWithoutErrors]
|
|
} else {
|
|
return acc
|
|
}
|
|
}, [])
|
|
|
|
if (resultDataWithoutErrors.length === 0) {
|
|
logger.Log(
|
|
`No peers returned data without error, aborting sync`,
|
|
'redbg'
|
|
)
|
|
return {
|
|
msg: 'No peers returned data without error, aborting sync',
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------------------
|
|
// third party peers handling
|
|
// -------------------------------------------------------------------------------------------------------
|
|
const peersHosts = [...peers.map((peer) => peer.host), selfInfo.host]
|
|
const thirdPartyPeers = resultDataWithoutErrors
|
|
.map((res) => {
|
|
return Object.values(res).map((x) => x.remoteInfo.myPeers)
|
|
})
|
|
.filter((x) => !!x)
|
|
.flatMap((x) => x)
|
|
.flatMap((x) => {
|
|
return x.filter(
|
|
(recievedPeer) => !peersHosts.includes(recievedPeer.host)
|
|
)
|
|
})
|
|
|
|
if (thirdPartyPeers.length > 0) {
|
|
updateThirdPartyPeers(thirdPartyPeers)
|
|
logger.Log(
|
|
`\tPeers reported ${logger.C('green')}${
|
|
thirdPartyPeers.length
|
|
}${logger.C()} third party peer(s) not connected to this server. See ${logger.C(
|
|
'blue'
|
|
)}${paths.thirdPartyPeersFile}${logger.C()} for details`
|
|
)
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------------------
|
|
// data syncing
|
|
// -------------------------------------------------------------------------------------------------------
|
|
|
|
const getData = (key: keyof NewDataResult['result']) => {
|
|
return resultDataWithoutErrors
|
|
.filter((x) => x[key])
|
|
.map((x) => x[key])
|
|
}
|
|
|
|
const syncResults: SyncResult[] = []
|
|
|
|
const userData = getData('users')
|
|
if (userData) {
|
|
const res = await syncUsers(userData, syncStart)
|
|
syncResults.push(res)
|
|
}
|
|
|
|
const questionData = getData('questions')
|
|
if (userData) {
|
|
const res = await syncQuestions(questionData, syncStart)
|
|
syncResults.push(res)
|
|
}
|
|
|
|
return syncResults.reduce(
|
|
(acc, x) => {
|
|
return {
|
|
old: { ...acc.old, ...x.old },
|
|
added: { ...acc.added, ...x.added },
|
|
final: { ...acc.final, ...x.final },
|
|
}
|
|
},
|
|
{ old: {}, added: {}, final: {} }
|
|
)
|
|
}
|
|
|
|
async function syncUsers(
|
|
userData: (UserSyncDataRes & { peer: PeerInfo })[],
|
|
syncStart: number
|
|
): Promise<SyncResult> {
|
|
logger.Log('Syncing users...')
|
|
const resultsCount: {
|
|
[key: string]: {
|
|
newUsers?: number
|
|
}
|
|
} = {}
|
|
const oldUserCount = dbtools.SelectAll(userDB, 'users').length
|
|
|
|
try {
|
|
userData.forEach((res) => {
|
|
if (res.encryptedUsers) {
|
|
let addedUserCount = 0
|
|
const decryptedUsers: User[] = JSON.parse(
|
|
decrypt(privateKey, res.encryptedUsers)
|
|
)
|
|
decryptedUsers.forEach((remoteUser) => {
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
|
const { id, ...remoteUserWithoutId } = remoteUser
|
|
const localUser = dbtools.Select(userDB, 'users', {
|
|
pw: remoteUser.pw,
|
|
})
|
|
if (localUser.length === 0) {
|
|
addedUserCount += 1
|
|
// FIXME: users will not have consistend id across servers. This may be
|
|
// harmless, will see
|
|
dbtools.Insert(userDB, 'users', {
|
|
...(remoteUserWithoutId as Omit<User, 'id'>),
|
|
sourceHost: peerToString(res.peer),
|
|
})
|
|
}
|
|
})
|
|
resultsCount[peerToString(res.peer)] = {
|
|
newUsers: addedUserCount,
|
|
}
|
|
updatePeersFile(res.peer, {
|
|
lastUsersSync: syncStart,
|
|
})
|
|
}
|
|
})
|
|
} catch (e) {
|
|
logger.Log(
|
|
'\tError while trying to sync users: ' + e.message,
|
|
'redbg'
|
|
)
|
|
console.error(e)
|
|
}
|
|
const newUserCount = dbtools.SelectAll(userDB, 'users').length
|
|
|
|
if (Object.keys(resultsCount).length === 0) {
|
|
logger.Log('No new users received')
|
|
} else {
|
|
logger.logTable(
|
|
[
|
|
['', 'Users'],
|
|
['Old', oldUserCount],
|
|
...Object.entries(resultsCount).map(([key, result]) => {
|
|
return [key, result.newUsers]
|
|
}),
|
|
['Added total', newUserCount - oldUserCount],
|
|
['Final', newUserCount],
|
|
],
|
|
{ colWidth: [15], rowPrefix: '\t' }
|
|
)
|
|
}
|
|
logger.Log(`Successfully synced users!`, 'green')
|
|
|
|
return {
|
|
old: {
|
|
oldUserCount: oldUserCount,
|
|
},
|
|
added: {
|
|
totalNewUers: newUserCount - oldUserCount,
|
|
},
|
|
final: {
|
|
newUserCount: newUserCount,
|
|
},
|
|
}
|
|
}
|
|
|
|
async function syncQuestions(
|
|
questionData: (QuestionSyncDataRes & { peer: PeerInfo })[],
|
|
syncStart: number
|
|
): Promise<SyncResult> {
|
|
logger.Log('Syncing questions...')
|
|
const recievedDataCounts: (number | string)[][] = []
|
|
// all results statistics
|
|
const resultsCount: {
|
|
[key: string]: {
|
|
newQuestionDbs?: number
|
|
newSubjects?: number
|
|
newQuestions?: number
|
|
}
|
|
} = {}
|
|
|
|
const resultDataWithoutEmptyDbs: (QuestionSyncDataRes & {
|
|
peer: PeerInfo
|
|
})[] = []
|
|
questionData.forEach((res) => {
|
|
const qdbCount = res.questionDbs.length
|
|
const { subjCount, questionCount } = countOfQdbs(res.questionDbs)
|
|
|
|
recievedDataCounts.push([
|
|
peerToString(res.peer),
|
|
qdbCount,
|
|
subjCount,
|
|
questionCount,
|
|
])
|
|
|
|
if (questionCount > 0) {
|
|
resultDataWithoutEmptyDbs.push(res)
|
|
} else {
|
|
updatePeersFile(res.peer, {
|
|
lastSync: syncStart,
|
|
})
|
|
}
|
|
})
|
|
|
|
logger.Log(`\tRecieved data from peers:`)
|
|
logger.logTable(
|
|
[['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts],
|
|
{
|
|
colWidth: [15],
|
|
rowPrefix: '\t',
|
|
}
|
|
)
|
|
|
|
const resultData = resultDataWithoutEmptyDbs.map((res) => {
|
|
return {
|
|
...res,
|
|
questionDbs: res.questionDbs.map((qdb) => {
|
|
return setupQuestionsForMerge(qdb, res.peer)
|
|
}),
|
|
}
|
|
})
|
|
|
|
const hasNewData = resultData.length > 0
|
|
if (!hasNewData) {
|
|
logger.Log(
|
|
`No peers returned any new questions. Question sync successfully finished!`,
|
|
'green'
|
|
)
|
|
updateLastSync(selfInfo, syncStart)
|
|
return {
|
|
msg: 'No peers returned any new questions',
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------------------
|
|
// backup
|
|
// -------------------------------------------------------------------------------------------------------
|
|
const { subjCount: oldSubjCount, questionCount: oldQuestionCount } =
|
|
countOfQdbs(getQuestionDbs())
|
|
const oldQuestionDbCount = getQuestionDbs().length
|
|
backupData(getQuestionDbs())
|
|
logger.Log('\tOld data backed up!')
|
|
|
|
// -------------------------------------------------------------------------------------------------------
|
|
// adding questions to db
|
|
// -------------------------------------------------------------------------------------------------------
|
|
for (let i = 0; i < resultData.length; i++) {
|
|
const { questionDbs: remoteQuestionDbs, peer } = resultData[i]
|
|
logger.Log(
|
|
`\tProcessing result from "${logger.C('blue')}${peerToString(
|
|
peer
|
|
)}${logger.C()}" (${logger.C('green')}${
|
|
resultData.length
|
|
}${logger.C()}/${logger.C('green')}${i + 1}${logger.C()})`
|
|
)
|
|
// FIXME: if remoteQuestionDbs contain multiple dbs with the same name, then the merging
|
|
// process could get wonky. Ideally it should not contain, but we will see
|
|
|
|
const { rawNewQuestionDbs, mergeResults } = await getMergeResults(
|
|
remoteQuestionDbs
|
|
)
|
|
|
|
const newQuestionDbs = updateQdbForLocalUse(rawNewQuestionDbs)
|
|
|
|
const { mergedQuestionDbs, changedQdbIndexes } = mergeQdbs(
|
|
getQuestionDbs(),
|
|
mergeResults
|
|
)
|
|
// setting new index & path
|
|
writeNewData(
|
|
newQuestionDbs,
|
|
getQuestionDbs().filter((qdb) => {
|
|
return changedQdbIndexes.includes(qdb.index)
|
|
}),
|
|
dbsFile,
|
|
publicDir
|
|
)
|
|
|
|
setQuestionDbs([...mergedQuestionDbs, ...newQuestionDbs])
|
|
|
|
const { newQuestionDbCount, newSubjectCount, newQuestionCount } =
|
|
await sendNewDataToWorkers(mergeResults, newQuestionDbs)
|
|
|
|
resultsCount[peerToString(peer)] = {
|
|
...(resultsCount[peerToString(peer)] || { newUsers: 0 }),
|
|
newQuestionDbs: newQuestionDbCount,
|
|
newSubjects: newSubjectCount,
|
|
newQuestions: newQuestionCount,
|
|
}
|
|
// Processing result data is successfull
|
|
updatePeersFile(peer, {
|
|
lastSync: syncStart,
|
|
})
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------------------
|
|
updateLastSync(selfInfo, syncStart)
|
|
|
|
const newQdb = getQuestionDbs()
|
|
const { subjCount: newSubjCount, questionCount: newQuestionCount } =
|
|
countOfQdbs(newQdb)
|
|
const newQuestionDbCount = newQdb.length
|
|
|
|
const resultsTable = Object.entries(resultsCount).map(
|
|
([key, value]) => {
|
|
return [
|
|
key.length > 14 ? key.substring(0, 14) + '...' : key,
|
|
value.newQuestionDbs,
|
|
value.newSubjects,
|
|
value.newQuestions,
|
|
]
|
|
}
|
|
)
|
|
|
|
const sumNewCount = (key: string) => {
|
|
return Object.values(resultsCount).reduce(
|
|
(acc, val) => acc + val[key],
|
|
0
|
|
)
|
|
}
|
|
|
|
const totalNewQuestions = sumNewCount('newQuestions')
|
|
const totalNewSubjects = sumNewCount('newSubjects')
|
|
const totalNewQdbs = sumNewCount('newQuestionDbs')
|
|
|
|
logger.logTable(
|
|
[
|
|
['', 'QDBs', 'Subjs', 'Questions'],
|
|
['Old', oldQuestionDbCount, oldSubjCount, oldQuestionCount],
|
|
...resultsTable,
|
|
[
|
|
'Added total',
|
|
totalNewQdbs,
|
|
totalNewSubjects,
|
|
totalNewQuestions,
|
|
],
|
|
['Final', newQuestionDbCount, newSubjCount, newQuestionCount],
|
|
],
|
|
{ colWidth: [15], rowPrefix: '\t' }
|
|
)
|
|
|
|
logger.Log(`Successfully synced questions!`, 'green')
|
|
|
|
return {
|
|
old: {
|
|
oldQuestionDbCount: oldQuestionDbCount,
|
|
oldSubjCount: oldSubjCount,
|
|
oldQuestionCount: oldQuestionCount,
|
|
},
|
|
added: {
|
|
totalNewQdbs: totalNewQdbs,
|
|
totalNewSubjects: totalNewSubjects,
|
|
totalNewQuestions: totalNewQuestions,
|
|
},
|
|
final: {
|
|
newQuestionDbCount: newQuestionDbCount,
|
|
newSubjCount: newSubjCount,
|
|
newQuestionCount: newQuestionCount,
|
|
},
|
|
}
|
|
}
|
|
|
|
function handleNewThirdPartyPeer(remoteHost: string) {
|
|
logger.Log(
|
|
'Couldn\'t find remote peer info based on remoteHost: "' +
|
|
remoteHost +
|
|
'". This could mean that the host uses this server as peer, but this server does not ' +
|
|
'use it as a peer.',
|
|
'yellowbg'
|
|
)
|
|
if (remoteHost.includes(':')) {
|
|
const [host, port] = remoteHost.split(':')
|
|
updateThirdPartyPeers([
|
|
{
|
|
host: host,
|
|
port: +port,
|
|
},
|
|
])
|
|
logger.Log('Host info written to host info file')
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------------------
|
|
// APP SETUP
|
|
// ---------------------------------------------------------------------------------------
|
|
app.get('/selfInfo', (_req: Request, res: Response<PeerInfo>) => {
|
|
res.json({ ...selfInfo, publicKey: publicKey })
|
|
})
|
|
|
|
app.get('/p2pinfo', (_req: Request, res: Response<RemotePeerInfo>) => {
|
|
res.json(getSelfInfo(true))
|
|
})
|
|
|
|
// TODO: get all user files
|
|
app.get('/getnewfilessince', (req: Request, res: Response<any>) => {
|
|
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
|
|
|
|
const remoteHost = req.query.host
|
|
const hostToLog = remoteHost || 'Unknown host'
|
|
|
|
const result: any = {
|
|
remoteInfo: getSelfInfo(),
|
|
}
|
|
|
|
if (remoteHost) {
|
|
const remotePeerInfo = peers.find((peer) => {
|
|
return peerToString(peer) === remoteHost
|
|
})
|
|
if (!remotePeerInfo) {
|
|
handleNewThirdPartyPeer(remoteHost)
|
|
}
|
|
}
|
|
const usersSinceDate = since
|
|
? new Date(since).toLocaleString()
|
|
: 'all time'
|
|
|
|
logger.Log(
|
|
`\tSending new files to ${logger.C(
|
|
'blue'
|
|
)}${hostToLog}${logger.C()} since ${logger.C(
|
|
'blue'
|
|
)}${usersSinceDate}${logger.C()}`
|
|
)
|
|
|
|
res.json(result)
|
|
})
|
|
|
|
app.get(
|
|
'/getnewuserssince',
|
|
(req: Request, res: Response<UserSyncDataRes>) => {
|
|
logger.LogReq(req)
|
|
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
|
|
|
|
const remoteHost = req.query.host
|
|
let hostToLog = remoteHost || 'Unknown host'
|
|
let sentUsers = 0
|
|
|
|
const result: UserSyncDataRes = {
|
|
remoteInfo: getSelfInfo(),
|
|
}
|
|
|
|
if (remoteHost) {
|
|
const remotePeerInfo = peers.find((peer) => {
|
|
return peerToString(peer) === remoteHost
|
|
})
|
|
if (!remotePeerInfo) {
|
|
handleNewThirdPartyPeer(remoteHost)
|
|
} else {
|
|
hostToLog = peerToString(remotePeerInfo)
|
|
}
|
|
|
|
if (remotePeerInfo) {
|
|
const remotePublicKey = remotePeerInfo?.publicKey
|
|
if (remotePublicKey) {
|
|
// FIXME: sign data?
|
|
const newUsers = getNewUsersSince(since)
|
|
sentUsers = newUsers.length
|
|
result.encryptedUsers = encrypt(
|
|
remotePublicKey,
|
|
JSON.stringify(newUsers)
|
|
)
|
|
|
|
const usersSinceDate = since
|
|
? new Date(since).toLocaleString()
|
|
: 'all time'
|
|
|
|
logger.Log(
|
|
`\tSending new users to ${logger.C(
|
|
'blue'
|
|
)}${hostToLog}${logger.C()} since ${logger.C(
|
|
'blue'
|
|
)}${usersSinceDate}${logger.C()}. Sent users: ${logger.C(
|
|
'blue'
|
|
)}${sentUsers}${logger.C()}`
|
|
)
|
|
} else if (remotePeerInfo) {
|
|
logger.Log(
|
|
`Warning: "${hostToLog}" has no public key saved!`,
|
|
'yellowbg'
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
res.json(result)
|
|
}
|
|
)
|
|
|
|
app.get(
|
|
'/getnewdatasince',
|
|
(req: Request, res: Response<QuestionSyncDataRes>) => {
|
|
// FIXME: hash question db to see if different?
|
|
// it could help in determining if it should be checked for new data, but it would only save
|
|
// a getNewDataSince() call per question db
|
|
logger.LogReq(req)
|
|
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
|
|
const remoteHost = req.query.host
|
|
|
|
const result: QuestionSyncDataRes = {
|
|
remoteInfo: getSelfInfo(),
|
|
}
|
|
|
|
const questionDbsWithNewQuestions = Number.isNaN(since)
|
|
? getQuestionDbs()
|
|
: getQuestionDbs()
|
|
.map((qdb) => {
|
|
return {
|
|
...qdb,
|
|
data: getNewDataSince(qdb.data, since),
|
|
}
|
|
})
|
|
.filter((qdb) => {
|
|
const { questionCount: questionCount } =
|
|
countOfQdb(qdb)
|
|
return questionCount > 0
|
|
})
|
|
|
|
const { subjCount: subjects, questionCount: questions } =
|
|
countOfQdbs(questionDbsWithNewQuestions)
|
|
|
|
result.questionDbs = questionDbsWithNewQuestions
|
|
result.count = {
|
|
qdbs: questionDbsWithNewQuestions.length,
|
|
subjects: subjects,
|
|
questions: questions,
|
|
}
|
|
|
|
let hostToLog = remoteHost || 'Unknown host'
|
|
const dateToLog = since
|
|
? new Date(since).toLocaleString()
|
|
: 'all time'
|
|
|
|
if (remoteHost) {
|
|
const remotePeerInfo = peers.find((peer) => {
|
|
return peerToString(peer) === remoteHost
|
|
})
|
|
if (!remotePeerInfo) {
|
|
handleNewThirdPartyPeer(remoteHost)
|
|
} else {
|
|
hostToLog = peerToString(remotePeerInfo)
|
|
}
|
|
}
|
|
|
|
logger.Log(
|
|
`\tSending new data to ${logger.C(
|
|
'blue'
|
|
)}${hostToLog}${logger.C()} since ${logger.C(
|
|
'blue'
|
|
)}${dateToLog}${logger.C()}`
|
|
)
|
|
logger.logTable(
|
|
[
|
|
['QDBs', 'Subjs', 'Questions'],
|
|
[
|
|
result.questionDbs.length,
|
|
result.count.subjects,
|
|
result.count.questions,
|
|
],
|
|
],
|
|
{ rowPrefix: '\t' }
|
|
)
|
|
|
|
res.json(result)
|
|
}
|
|
)
|
|
|
|
app.get('/syncp2pdata', (req: Request, res: Response) => {
|
|
logger.LogReq(req)
|
|
const questions = !!req.query.questions
|
|
const users = !!req.query.users
|
|
const allTime = !!req.query.allTime
|
|
const user = req.session.user
|
|
|
|
if (!user || user.id !== 1) {
|
|
res.json({
|
|
status: 'error',
|
|
message: 'only user 1 can call this EP',
|
|
})
|
|
return
|
|
}
|
|
|
|
// FIXME: /syncResult EP if this EP times out, but we still need the result
|
|
if (syncInProgress) {
|
|
res.json({
|
|
error: 'A sync is already in progress!',
|
|
})
|
|
return
|
|
}
|
|
|
|
syncInProgress = true
|
|
setPendingJobsAlertCount(5000)
|
|
syncData({
|
|
shouldSync: {
|
|
questions: questions,
|
|
users: users,
|
|
},
|
|
allTime: allTime,
|
|
})
|
|
.then((syncResult) => {
|
|
res.json({
|
|
msg: 'sync successfull',
|
|
...syncResult,
|
|
})
|
|
setPendingJobsAlertCount()
|
|
syncInProgress = false
|
|
})
|
|
.catch((e) => {
|
|
console.error(e)
|
|
res.json({
|
|
error: e,
|
|
msg: e.message,
|
|
})
|
|
setPendingJobsAlertCount()
|
|
syncInProgress = false
|
|
})
|
|
})
|
|
|
|
logger.Log(
|
|
'P2P functionality set up. Peers (' +
|
|
peers.length +
|
|
'): ' +
|
|
peers.map((peer) => peerToString(peer)).join(', '),
|
|
'blue'
|
|
)
|
|
|
|
return {}
|
|
}
|
|
|
|
export default {
|
|
setup: setup,
|
|
}
|