p2p fixes

This commit is contained in:
mrfry 2023-03-26 19:11:07 +02:00
parent 2edc87d5dd
commit 16d6f04936
17 changed files with 707 additions and 582 deletions

View file

@ -19,7 +19,6 @@
------------------------------------------------------------------------- */
import { Response } from 'express'
import * as child_process from 'child_process'
import http from 'http'
import logger from '../../../utils/logger'
@ -31,9 +30,10 @@ import {
Subject,
QuestionDb,
User,
DataFile,
} from '../../../types/basicTypes'
import utils from '../../../utils/utils'
import { backupData /*writeData*/ } from '../../../utils/actions'
import { backupData, writeData } from '../../../utils/actions'
import { WorkerResult } from '../../../utils/classes'
import dbtools from '../../../utils/dbtools'
import {
@ -59,6 +59,7 @@ import {
SelfInfoSchema,
validateJSON,
} from '../../../types/typeSchemas'
import constants from '../../../constants.json'
// TODO: remove FINALIZE-s and TOTEST-s
@ -72,14 +73,19 @@ interface MergeResult {
interface RemotePeerInfo {
selfInfo: PeerInfo
myPeers: PeerInfo[]
revision?: string
serverRevision?: string
scriptRevision?: string
qminingPageRevision?: string
dataEditorRevision?: string
serverBuildTime?: number
qminingPageBuildTime?: number
dataEditorBuildTime?: number
scriptVersion?: string
qdbInfo?: {
dbName: string
subjs: {
name: string
count: number
}[]
}[]
questionDbCount: number
subjectCount: number
questionCount: number
}
}
interface RequestResult<T> {
@ -110,6 +116,7 @@ function get<T>(options: http.RequestOptions): Promise<RequestResult<T>> {
try {
resolve({ data: JSON.parse(body) })
} catch (e) {
console.log(body)
resolve({ error: e, options: options })
}
})
@ -135,7 +142,7 @@ export function getNewDataSince(subjects: Subject[], date: number): Subject[] {
return {
...subject,
Questions: subject.Questions.filter((question) => {
return (question.data.date || 0) > date
return (question.data.date || 0) >= date
}).map((question) => removeCacheFromQuestion(question)),
}
})
@ -269,13 +276,21 @@ async function sendNewDataToWorkers(
function writeNewData(
newQuestionDbs: QuestionDb[],
changedQuestionDbs: QuestionDb[]
changedQuestionDbs: QuestionDb[],
dbsFilePath: string,
publicDir: string
) {
const qdbsToWrite = [...newQuestionDbs, ...changedQuestionDbs]
const qdbsToWrite = [...changedQuestionDbs, ...newQuestionDbs]
const qdbsFile: DataFile[] = qdbsToWrite.map((qdb) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { data, index, ...restOfQdb } = qdb
return restOfQdb
})
utils.WriteFile(JSON.stringify(qdbsFile, null, 2), dbsFilePath)
qdbsToWrite.forEach((qdb) => {
try {
// FINALIZE: write to file
// writeData(qdb.data, qdb.path)
writeData(qdb.data, publicDir + qdb.path)
} catch (e) {
logger.Log(`Error writing ${qdb.name} qdb to file!`, 'redbg')
console.error(e)
@ -319,18 +334,16 @@ function setup(data: SubmoduleData): Submodule {
app,
userDB,
publicdirs,
moduleSpecificData: { questionDbs, setQuestionDbs, getQuestionDbs },
// publicdirs,
moduleSpecificData: { setQuestionDbs, getQuestionDbs, dbsFile },
} = data
const publicDir = publicdirs[0]
let syncInProgress = false
// ---------------------------------------------------------------------------------------
// SETUP
// ---------------------------------------------------------------------------------------
// const publicDir = publicdirs[0]
if (!utils.FileExists(peersFile)) {
logger.Log(
`Warning: peers file was missing, so it was created`,
@ -440,11 +453,6 @@ function setup(data: SubmoduleData): Submodule {
`Warning: peers file is empty. You probably want to fill it`,
'yellowbg'
)
} else {
logger.Log('Loaded peers: ' + peers.length)
peers.forEach((peer, i) => {
logger.Log(`\t${i}\t"${peer.name}": ${peerToString(peer)}`)
})
}
// ---------------------------------------------------------------------------------------
@ -456,32 +464,35 @@ function setup(data: SubmoduleData): Submodule {
selfInfo: selfInfo,
myPeers: peers,
}
try {
// FIXME: dont log if fails
result.revision = child_process
.execSync('git rev-parse HEAD', {
cwd: __dirname,
stdio: [0, 'pipe', null],
})
.toString()
.trim()
} catch (e) {
result.revision = 'Failed to get revision'
}
result.serverRevision = utils.getGitRevision(__dirname)
result.scriptRevision = utils.getGitRevision(
constants.moodleTestUserscriptDir
)
result.qminingPageRevision = utils.getGitRevision(
constants.qminingPageDir
)
result.dataEditorRevision = utils.getGitRevision(
constants.dataEditorPageDir
)
result.qminingPageBuildTime = utils
.statFile(constants.qminingIndexPath)
?.mtime.getTime()
result.serverBuildTime = utils
.statFile(constants.serverPath)
?.mtime.getTime()
result.dataEditorBuildTime = utils
.statFile(constants.dataEditorIndexPath)
?.mtime.getTime()
result.scriptVersion = utils.getScriptVersion()
if (includeQdbInfo) {
result.qdbInfo = getQuestionDbs().map((qdb) => {
return {
dbName: qdb.name,
subjs: qdb.data.map((subj) => {
return {
name: subj.Name,
count: subj.Questions.length,
}
}),
}
})
const questionDbCount = getQuestionDbs().length
const { subjCount, questionCount } = countOfQdbs(getQuestionDbs())
result.qdbInfo = {
questionDbCount: questionDbCount,
subjectCount: subjCount,
questionCount: questionCount,
}
}
return result
@ -506,7 +517,7 @@ function setup(data: SubmoduleData): Submodule {
return {
...qdb,
index: availableIndexes[i],
path: `${publicDir}questionDbs/${qdb.name}.json'`,
path: `questionDbs/${qdb.name}.json'`,
}
})
}
@ -543,7 +554,6 @@ function setup(data: SubmoduleData): Submodule {
}
async function syncData() {
// TOTEST: try with 0 date to merge full dbs
if (peers.length === 0) {
logger.Log(
`There are no peers specified in ${peersFile}, aborting sync`,
@ -560,31 +570,36 @@ function setup(data: SubmoduleData): Submodule {
}${logger.C()} peers`
)
const lastSync = new Date('2022-03-12').getTime() // FINALIZE date: this is only for testing // selfInfo.lastSync
const lastSync = new Date('2012-03-12').getTime() // FINALIZE date: this is only for testing // selfInfo.lastSync
logger.Log(
`\tLast sync date: ${logger.C('blue')}${new Date(
lastSync
).toLocaleString()}${logger.C()}`
)
const syncStart = new Date().getTime()
const requests = peers.map((peer) => {
const lastSyncWithPeer = new Date('2022-03-12').getTime() // FINALIZE same as above // peer.lastSync || 0
const lastSyncInfos = peers.map((peer) => {
return [
peerToString(peer),
new Date(peer.lastSync).toLocaleString(),
]
})
logger.Log(`\tLast sync with peers:`)
logger.logTable([['', 'Date'], ...lastSyncInfos], {
colWidth: [15],
rowPrefix: '\t',
})
const requests = peers.map((peer) => {
const lastSyncWithPeer = new Date('2012-03-12').getTime() // FINALIZE same as above // peer.lastSync || 0
logger.Log(
`\tLast sync with ${logger.C('blue')}${peerToString(
peer
)}${logger.C()}: ${logger.C('blue')}${new Date(
lastSyncWithPeer
).toLocaleString()}${logger.C()}`
)
return new Promise<RequestResult<SyncDataRes & { peer: PeerInfo }>>(
(resolve) => {
get<SyncDataRes>({
host: peer.host,
port: peer.port,
path: `/getnewdatasince?host=${selfInfo.host}${
lastSync ? `&since=${lastSyncWithPeer}` : ''
}`,
path: `/getnewdatasince?host=${encodeURIComponent(
peerToString(selfInfo)
)}${lastSync ? `&since=${lastSyncWithPeer}` : ''}`,
}).then((res) => {
resolve({ ...res, data: { ...res.data, peer: peer } })
})
@ -621,6 +636,7 @@ function setup(data: SubmoduleData): Submodule {
}
}
const recievedDataCounts: (number | string)[][] = []
const resultDataWithoutEmptyDbs = resultDataWithoutErrors.filter(
(res) => {
const qdbCount = res.questionDbs.length
@ -628,22 +644,26 @@ function setup(data: SubmoduleData): Submodule {
res.questionDbs
)
logger.Log(
`\t"${logger.C('blue')}${peerToString(
res.peer
)}${logger.C()}" sent "${logger.C(
'green'
)}${qdbCount}${logger.C()}" question DB-s with "${logger.C(
'green'
)}${subjCount.toLocaleString()}${logger.C()}" subjects, and "${logger.C(
'green'
)}${questionCount.toLocaleString()}${logger.C()}" questions`
)
recievedDataCounts.push([
peerToString(res.peer),
qdbCount,
subjCount,
questionCount,
])
return questionCount > 0
}
)
logger.Log(`\tRecieved data from peers:`)
logger.logTable(
[['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts],
{
colWidth: [15],
rowPrefix: '\t',
}
)
const resultData = resultDataWithoutEmptyDbs.map((res) => {
return {
...res,
@ -665,17 +685,14 @@ function setup(data: SubmoduleData): Submodule {
)
})
if (thirdPartyPeers.length > 0) {
logger.Log(
`\tPeers reported ${logger.C('green')}${
thirdPartyPeers.length
}${logger.C()} third party peer(s) not connected to this server.`
)
utils.WriteFile(
JSON.stringify(thirdPartyPeers, null, 2),
thirdPartyPeersFile
)
logger.Log(
`\tSee ${logger.C(
`\tPeers reported ${logger.C('green')}${
thirdPartyPeers.length
}${logger.C()} third party peer(s) not connected to this server. See ${logger.C(
'blue'
)}${thirdPartyPeersFile}${logger.C()} for details`
)
@ -690,6 +707,7 @@ function setup(data: SubmoduleData): Submodule {
newQuestions?: number
}
} = {}
// -------------------------------------------------------------------------------------------------------
// new users handlin TOTEST: test
// -------------------------------------------------------------------------------------------------------
@ -700,25 +718,23 @@ function setup(data: SubmoduleData): Submodule {
const decryptedUsers: User[] = JSON.parse(
decrypt(privateKey, res.encryptedUsers)
)
let newUserCount = 0
decryptedUsers.forEach((remoteUser) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { id, ...remoteUserWithoutId } = remoteUser
const localUser = dbtools.Select(userDB, 'users', {
pw: remoteUser.pw,
})
if (!localUser) {
if (localUser.length === 0) {
// FIXME: users will not have consistend id across servers. This may be
// harmless, will see
dbtools.Insert(userDB, 'users', {
...(remoteUserWithoutId as Omit<User, 'id'>),
sourceHost: peerToString(res.peer),
})
newUserCount += 1
}
})
resultsCount[peerToString(res.peer)] = {
newUsers: newUserCount,
newUsers: decryptedUsers.length,
}
}
})
@ -746,7 +762,6 @@ function setup(data: SubmoduleData): Submodule {
// -------------------------------------------------------------------------------------------------------
// backup
// -------------------------------------------------------------------------------------------------------
const { subjCount: oldSubjCount, questionCount: oldQuestionCount } =
countOfQdbs(getQuestionDbs())
const oldQuestionDbCount = getQuestionDbs().length
@ -757,7 +772,6 @@ function setup(data: SubmoduleData): Submodule {
// -------------------------------------------------------------------------------------------------------
// adding questions to db
// -------------------------------------------------------------------------------------------------------
for (let i = 0; i < resultData.length; i++) {
const { questionDbs: remoteQuestionDbs, peer } = resultData[i]
logger.Log(
@ -786,7 +800,9 @@ function setup(data: SubmoduleData): Submodule {
newQuestionDbs,
getQuestionDbs().filter((qdb) => {
return changedQdbIndexes.includes(qdb.index)
})
}),
dbsFile,
publicDir
)
setQuestionDbs([...mergedQuestionDbs, ...newQuestionDbs])
@ -876,7 +892,7 @@ function setup(data: SubmoduleData): Submodule {
newQuestionCount,
],
],
{ colWidth: [15] }
{ colWidth: [15], rowPrefix: '\t' }
)
logger.Log(
@ -922,8 +938,8 @@ function setup(data: SubmoduleData): Submodule {
const remoteHost = req.query.host
const questionDbsWithNewQuestions = Number.isNaN(since)
? questionDbs
: questionDbs
? getQuestionDbs()
: getQuestionDbs()
.map((qdb) => {
return {
...qdb,
@ -949,26 +965,54 @@ function setup(data: SubmoduleData): Submodule {
remoteInfo: getSelfInfo(),
}
let hostToLog = req.hostname
if (remoteHost) {
const remoteHostInfo = peers.find((peer) => {
return peer.host === remoteHost
const remotePeerInfo = peers.find((peer) => {
return peerToString(peer) === remoteHost
})
const remotePublicKey = remoteHostInfo?.publicKey
if (remotePublicKey) {
// FIXME: sign data?
const newUsers = getNewUsersSince(since)
result.encryptedUsers = encrypt(
remotePublicKey,
JSON.stringify(newUsers)
)
} else if (remoteHostInfo) {
if (remotePeerInfo) {
hostToLog = peerToString(remotePeerInfo)
const remotePublicKey = remotePeerInfo?.publicKey
if (remotePublicKey) {
// FIXME: sign data?
const newUsers = getNewUsersSince(since)
result.encryptedUsers = encrypt(
remotePublicKey,
JSON.stringify(newUsers)
)
logger.Log(
`Sending new users to "${remoteHost}" (encrypted)`,
'green'
)
} else if (remotePeerInfo) {
logger.Log(
`Warning: "${hostToLog}" has no public key saved!`,
'yellowbg'
)
}
} else {
logger.Log(
`Warning: ${remoteHostInfo.host}:${remoteHostInfo.port} has no publick key saved!`,
'Couldn\'t find remote peer info based on remoteHost: "' +
remoteHost +
'". This could mean that the host uses this server as peer, but this server does not ' +
'use it as a peer.',
'yellowbg'
)
}
}
const dateToLog = Number.isNaN(since)
? 'all time'
: new Date(since).toLocaleString()
logger.Log(
`Sending new data to ${logger.C(
'blue'
)}${hostToLog}${logger.C()} since ${logger.C(
'blue'
)}${dateToLog}${logger.C()} `
)
res.json(result)
})
@ -984,6 +1028,15 @@ function setup(data: SubmoduleData): Submodule {
// return
// }
// FIXME: /syncResult EP if this EP times out, but we still need the result
if (syncInProgress) {
res.json({
error: 'A sync is already in progress!',
})
return
}
syncInProgress = true
setPendingJobsAlertCount(5000)
syncData()
.then((syncResult) => {
@ -992,6 +1045,7 @@ function setup(data: SubmoduleData): Submodule {
...syncResult,
})
setPendingJobsAlertCount()
syncInProgress = false
})
.catch((e) => {
console.error(e)
@ -1000,10 +1054,17 @@ function setup(data: SubmoduleData): Submodule {
msg: e.message,
})
setPendingJobsAlertCount()
syncInProgress = false
})
})
logger.Log('P2P functionality set up. Peers: ' + peers.length, 'blue')
logger.Log(
'P2P functionality set up. Peers (' +
peers.length +
'): ' +
peers.map((peer) => peerToString(peer)).join(', '),
'blue'
)
return {}
}