p2p syncing refactor

This commit is contained in:
mrfry 2023-05-01 16:29:59 +02:00
parent 1a3c806e66
commit 5ececa2de5
6 changed files with 309 additions and 219 deletions

View file

@ -12,4 +12,9 @@ if (!domain) {
export default {
savedQuestionsFileName: 'savedQuestions.json',
domain: domain,
// --------------------------------------------------------------------------------
// user files
// --------------------------------------------------------------------------------
userFilesDataFileName: '.data.json',
}

View file

@ -61,11 +61,15 @@ import {
} from '../../../worker/workerPool'
import { WorkerResult } from '../../../worker/worker'
import {
isPeerSameAs,
loginToPeer,
peerToString,
updatePeersFile,
} from '../../../utils/p2putils'
import { Database } from 'better-sqlite3'
import constants from '../../../constants'
import path from 'node:path'
import { UserDirDataFile } from './userFiles'
interface MergeResult {
newData: Subject[]
@ -104,30 +108,31 @@ interface SyncResult {
msg?: string
}
interface SyncDataResBase {
result?: string
interface SyncDataResult {
remoteInfo?: RemotePeerInfo
}
interface UserSyncDataRes extends SyncDataResBase {
encryptedUsers?: string
}
interface QuestionSyncDataRes extends SyncDataResBase {
questionDbs?: QuestionDb[]
count?: {
qdbs: number
subjects: number
questions: number
users?: {
encryptedUsers: string
}
questions?: {
questionDbs: QuestionDb[]
count: {
qdbs: number
subjects: number
questions: number
}
}
userFiles?: {
newFiles: {
[key: string]: {
[key: string]: UserDirDataFile
}
}
}
}
interface NewDataResult {
peer: PeerInfo
result?: {
questions?: GetResult<QuestionSyncDataRes>
users?: GetResult<UserSyncDataRes>
}
result?: GetResult<SyncDataResult>
error?: Error
}
@ -374,6 +379,7 @@ async function authAndGetNewData({
shouldSync: {
questions: boolean
users: boolean
userFiles: boolean
}
}): Promise<NewDataResult> {
try {
@ -415,32 +421,31 @@ async function authAndGetNewData({
)
}
let result: NewDataResult['result'] = {}
let result: NewDataResult['result']
const setResult = async () => {
if (shouldSync.questions || syncAll) {
result.questions = await getData<QuestionSyncDataRes>(
`/api/getnewdatasince?host=${encodeURIComponent(
peerToString(selfInfo)
)}${
peer.lastSync && !allTime
? `&since=${peer.lastSync}`
: ''
}`
)
let url = `/api/getnewdata?host=${encodeURIComponent(
peerToString(selfInfo)
)}`
if (allTime) url += '&allTime=true'
if (!syncAll) {
if (shouldSync.questions) {
url += '&questions=true'
url += `&questionsSince=${peer.lastQuestionsSync}`
}
if (shouldSync.users) {
url += '&users=true'
url += `&usersSince=${peer.lastUsersSync}`
}
if (shouldSync.userFiles) {
url += '&userFiles=true'
url += `&userFilesSince=${peer.lastUserFilesSync}`
}
}
if (shouldSync.users || syncAll) {
result.users = await getData<QuestionSyncDataRes>(
`/api/getnewuserssince?host=${encodeURIComponent(
peerToString(selfInfo)
)}${
peer.lastUsersSync && !allTime
? `&since=${peer.lastUsersSync}`
: ''
}`
)
}
result = await getData<SyncDataResult>(url)
}
await setResult()
@ -486,6 +491,34 @@ function addUsersToDb(
return addedUserCount
}
function getNewUserFilesSince(since: number) {
const newData: SyncDataResult['userFiles']['newFiles'] = {}
const dirs = utils.ReadDir(paths.userFilesDir)
dirs.forEach((dir) => {
const userDirPath = path.join(paths.userFilesDir, dir)
const dataFilePath = path.join(
userDirPath,
constants.userFilesDataFileName
)
if (!utils.FileExists(dataFilePath)) {
return
}
newData[dir] = {}
const dataFile =
utils.ReadJSON<Map<string, UserDirDataFile>>(dataFilePath)
Object.entries(dataFile).forEach(([fileName, data]) => {
const mtime = utils.statFile(path.join(userDirPath, fileName)).mtime
if (mtime.getTime() >= since) {
newData[dir][fileName] = data
}
})
})
return newData
}
function setup(data: SubmoduleData): Submodule {
const {
app,
@ -696,6 +729,7 @@ function setup(data: SubmoduleData): Submodule {
shouldSync: {
questions: boolean
users: boolean
userFiles: boolean
}
allTime: boolean
}) {
@ -772,44 +806,19 @@ function setup(data: SubmoduleData): Submodule {
// filtering, transforming, and counting responses
// -------------------------------------------------------------------------------------------------------
allResults.forEach((res) => {
const errors = res?.error
? [{ key: 'all', error: res.error }]
: Object.entries(res.result)
.map(([key, x]) =>
x.error ? { error: x.error, key: key } : null
)
.filter((x) => !!x)
if (errors.length > 0) {
if (res?.error) {
logger.Log(
`\tError syncing with ${peerToString(res.peer)}`,
`\tError syncing with ${peerToString(res.peer)}: ${
res.error.message
}`,
'red'
)
errors.forEach((e) => {
logger.Log(`\t${e.key}: ${e.error.message}`)
})
}
})
const resultDataWithoutErrors: {
questions?: QuestionSyncDataRes & { peer: PeerInfo }
users?: UserSyncDataRes & { peer: PeerInfo }
}[] = allResults.reduce((acc, resData) => {
const resDataWithoutErrors = Object.entries(resData.result).reduce(
(acc, [key, x]) => {
if (!x.error) {
acc[key] = { ...x.data, peer: resData.peer }
}
return acc
},
{}
)
if (Object.keys(resDataWithoutErrors).length > 0) {
return [...acc, resDataWithoutErrors]
} else {
return acc
}
}, [])
const resultDataWithoutErrors = allResults
.filter((resData) => !resData.error && resData.result.data)
.map((resData) => resData.result.data)
if (resultDataWithoutErrors.length === 0) {
logger.Log(
@ -824,17 +833,14 @@ function setup(data: SubmoduleData): Submodule {
// -------------------------------------------------------------------------------------------------------
// third party peers handling
// -------------------------------------------------------------------------------------------------------
const peersHosts = [...peers.map((peer) => peer.host), selfInfo.host]
const peersHosts = [...peers, selfInfo]
const thirdPartyPeers = resultDataWithoutErrors
.map((res) => {
return Object.values(res).map((x) => x.remoteInfo.myPeers)
})
.filter((x) => !!x)
.flatMap((x) => x)
.flatMap((x) => {
return x.filter(
(recievedPeer) => !peersHosts.includes(recievedPeer.host)
)
.map((res) => res.remoteInfo)
.flatMap((res) => res.myPeers)
.filter((res) => {
return !peersHosts.some((localPeer) => {
return isPeerSameAs(localPeer, res)
})
})
if (thirdPartyPeers.length > 0) {
@ -852,23 +858,30 @@ function setup(data: SubmoduleData): Submodule {
// data syncing
// -------------------------------------------------------------------------------------------------------
const getData = (key: keyof NewDataResult['result']) => {
return resultDataWithoutErrors
.filter((x) => x[key])
.map((x) => x[key])
const getData = <T extends keyof SyncDataResult>(key: T) => {
return resultDataWithoutErrors.map((x) => ({
...x[key],
peer: x.remoteInfo.selfInfo,
}))
}
const syncResults: SyncResult[] = []
const questionData = getData('questions')
if (questionData && questionData.length > 0) {
const res = await syncQuestions(questionData, syncStart)
syncResults.push(res)
}
const userData = getData('users')
if (userData && userData.length > 0) {
const res = await syncUsers(userData, syncStart)
syncResults.push(res)
}
const questionData = getData('questions')
if (questionData && questionData.length > 0) {
const res = await syncQuestions(questionData, syncStart)
const userFilesData = getData('userFiles')
if (userFilesData && userFilesData.length > 0) {
const res = await syncUserFiles(userFilesData, syncStart)
syncResults.push(res)
}
@ -885,7 +898,7 @@ function setup(data: SubmoduleData): Submodule {
}
async function syncUsers(
userData: (UserSyncDataRes & { peer: PeerInfo })[],
userData: (SyncDataResult['users'] & { peer: PeerInfo })[],
syncStart: number
): Promise<SyncResult> {
logger.Log('Syncing users...')
@ -946,19 +959,19 @@ function setup(data: SubmoduleData): Submodule {
return {
old: {
oldUserCount: oldUserCount,
users: oldUserCount,
},
added: {
totalNewUers: newUserCount - oldUserCount,
users: newUserCount - oldUserCount,
},
final: {
newUserCount: newUserCount,
users: newUserCount,
},
}
}
async function syncQuestions(
questionData: (QuestionSyncDataRes & { peer: PeerInfo })[],
questionData: (SyncDataResult['questions'] & { peer: PeerInfo })[],
syncStart: number
): Promise<SyncResult> {
logger.Log('Syncing questions...')
@ -972,7 +985,7 @@ function setup(data: SubmoduleData): Submodule {
}
} = {}
const resultDataWithoutEmptyDbs: (QuestionSyncDataRes & {
const resultDataWithoutEmptyDbs: (SyncDataResult['questions'] & {
peer: PeerInfo
})[] = []
questionData.forEach((res) => {
@ -1135,19 +1148,56 @@ function setup(data: SubmoduleData): Submodule {
return {
old: {
oldQuestionDbCount: oldQuestionDbCount,
oldSubjCount: oldSubjCount,
oldQuestionCount: oldQuestionCount,
questionDbs: oldQuestionDbCount,
subjects: oldSubjCount,
questions: oldQuestionCount,
},
added: {
totalNewQdbs: totalNewQdbs,
totalNewSubjects: totalNewSubjects,
totalNewQuestions: totalNewQuestions,
questionDbs: totalNewQdbs,
subjects: totalNewSubjects,
questions: totalNewQuestions,
},
final: {
newQuestionDbCount: newQuestionDbCount,
newSubjCount: newSubjCount,
newQuestionCount: newQuestionCount,
questionDbs: newQuestionDbCount,
subjects: newSubjCount,
questions: newQuestionCount,
},
}
}
async function syncUserFiles(
newData: (SyncDataResult['userFiles'] & { peer: PeerInfo })[],
syncStart: number
): Promise<SyncResult> {
logger.Log('Syncing user files...')
// ... magic magic code code code ...
console.log(newData, syncStart)
const filesToGet = []
newData.forEach((res) => {
Object.entries(res.newFiles).forEach(([dirName, userFilesDir]) => {
Object.entries(userFilesDir).forEach(([fileName, data]) => {
console.log(fileName, data)
filesToGet.push({
fileName: fileName,
path: path.join(publicDir, dirName, fileName),
data: data,
})
})
})
})
logger.Log(`Successfully synced user files!`, 'green')
return {
old: {
userFiles: 0,
},
added: {
userFiles: 0,
},
final: {
userFiles: 0,
},
}
}
@ -1183,32 +1233,97 @@ function setup(data: SubmoduleData): Submodule {
res.json(getSelfInfo(true))
})
// TODO: get all user files
app.get('/getnewfilessince', (req: Request, res: Response<any>) => {
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
app.get('/getnewdata', (req: Request, res: Response<any>) => {
logger.LogReq(req)
const questionsSince = Number.isNaN(+req.query.questionsSince)
? 0
: +req.query.questionsSince
const usersSince = Number.isNaN(+req.query.usersSince)
? 0
: +req.query.usersSince
const userFilesSince = Number.isNaN(+req.query.userFilesSince)
? 0
: +req.query.userFilesSince
res.json({ since: since, message: 'unimplemented' })
})
const questions = !!req.query.questions
const users = !!req.query.users
const userFiles = !!req.query.userFiles
const remoteHost = req.query.host
app.get(
'/getnewuserssince',
(
req: Request,
res: Response<
UserSyncDataRes & { message?: string; success?: boolean }
>
) => {
logger.LogReq(req)
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
const sendAll = !questions && !users && !userFiles
const remoteHost = req.query.host
let hostToLog = remoteHost || 'Unknown host'
let sentUsers = 0
let hostToLog = remoteHost || 'Unknown host'
let remotePeerInfo: PeerInfo = null
const result: UserSyncDataRes = {
remoteInfo: getSelfInfo(),
if (remoteHost) {
remotePeerInfo = peers.find((peer) => {
return peerToString(peer) === remoteHost
})
if (!remotePeerInfo) {
handleNewThirdPartyPeer(remoteHost)
} else {
hostToLog = peerToString(remotePeerInfo)
}
}
const result: SyncDataResult = {
remoteInfo: getSelfInfo(),
}
if (questions || sendAll) {
const questionDbsWithNewQuestions = Number.isNaN(questionsSince)
? getQuestionDbs()
: getQuestionDbs()
.map((qdb) => {
return {
...qdb,
data: getNewDataSince(qdb.data, questionsSince),
}
})
.filter((qdb) => {
const { questionCount: questionCount } =
countOfQdb(qdb)
return questionCount > 0
})
const { subjCount: subjects, questionCount: questions } =
countOfQdbs(questionDbsWithNewQuestions)
result.questions = {
questionDbs: questionDbsWithNewQuestions,
count: {
qdbs: questionDbsWithNewQuestions.length,
subjects: subjects,
questions: questions,
},
}
const questionsSinceDate = questionsSince
? new Date(questionsSince).toLocaleString()
: 'all time'
logger.Log(
`\tSending new data to ${logger.C(
'blue'
)}${hostToLog}${logger.C()} since ${logger.C(
'blue'
)}${questionsSinceDate}${logger.C()}`
)
logger.logTable(
[
['QDBs', 'Subjs', 'Questions'],
[
result.questions.questionDbs.length,
result.questions.count.subjects,
result.questions.count.questions,
],
],
{ rowPrefix: '\t' }
)
}
if (users || sendAll) {
let sentUsers = 0
if (!remoteHost) {
res.json({
...result,
@ -1217,16 +1332,6 @@ function setup(data: SubmoduleData): Submodule {
})
return
}
const remotePeerInfo = peers.find((peer) => {
return peerToString(peer) === remoteHost
})
if (!remotePeerInfo) {
handleNewThirdPartyPeer(remoteHost)
} else {
hostToLog = peerToString(remotePeerInfo)
}
if (!remotePeerInfo) {
res.json({
success: false,
@ -1235,19 +1340,20 @@ function setup(data: SubmoduleData): Submodule {
})
return
}
const remotePublicKey = remotePeerInfo.publicKey
if (remotePublicKey) {
// FIXME: sign data?
const newUsers = getNewUsersSince(since)
const newUsers = getNewUsersSince(usersSince)
sentUsers = newUsers.length
result.encryptedUsers = encrypt(
remotePublicKey,
JSON.stringify(newUsers)
)
result.users = {
encryptedUsers: encrypt(
remotePublicKey,
JSON.stringify(newUsers)
),
}
const usersSinceDate = since
? new Date(since).toLocaleString()
const usersSinceDate = usersSince
? new Date(usersSince).toLocaleString()
: 'all time'
logger.Log(
@ -1265,93 +1371,39 @@ function setup(data: SubmoduleData): Submodule {
'yellowbg'
)
}
res.json(result)
}
)
app.get(
'/getnewdatasince',
(req: Request, res: Response<QuestionSyncDataRes>) => {
// FIXME: hash question db to see if different?
// it could help in determining if it should be checked for new data, but it would only save
// a getNewDataSince() call per question db
logger.LogReq(req)
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
const remoteHost = req.query.host
if (userFiles || sendAll) {
const newFiles = getNewUserFilesSince(userFilesSince)
result.userFiles = { newFiles: newFiles }
const result: QuestionSyncDataRes = {
remoteInfo: getSelfInfo(),
}
const questionDbsWithNewQuestions = Number.isNaN(since)
? getQuestionDbs()
: getQuestionDbs()
.map((qdb) => {
return {
...qdb,
data: getNewDataSince(qdb.data, since),
}
})
.filter((qdb) => {
const { questionCount: questionCount } =
countOfQdb(qdb)
return questionCount > 0
})
const { subjCount: subjects, questionCount: questions } =
countOfQdbs(questionDbsWithNewQuestions)
result.questionDbs = questionDbsWithNewQuestions
result.count = {
qdbs: questionDbsWithNewQuestions.length,
subjects: subjects,
questions: questions,
}
let hostToLog = remoteHost || 'Unknown host'
const dateToLog = since
? new Date(since).toLocaleString()
const userFilesSinceDate = questionsSince
? new Date(questionsSince).toLocaleString()
: 'all time'
if (remoteHost) {
const remotePeerInfo = peers.find((peer) => {
return peerToString(peer) === remoteHost
})
if (!remotePeerInfo) {
handleNewThirdPartyPeer(remoteHost)
} else {
hostToLog = peerToString(remotePeerInfo)
}
}
logger.Log(
`\tSending new data to ${logger.C(
`\tSending new user files to ${logger.C(
'blue'
)}${hostToLog}${logger.C()} since ${logger.C(
'blue'
)}${dateToLog}${logger.C()}`
)}${userFilesSinceDate}${logger.C()}`
)
logger.logTable(
[
['QDBs', 'Subjs', 'Questions'],
[
result.questionDbs.length,
result.count.subjects,
result.count.questions,
],
],
{ rowPrefix: '\t' }
logger.Log(
`\tSent files count: ${
Object.keys(result.userFiles.newFiles).length
}`
)
res.json(result)
}
)
res.json(result)
})
app.get('/syncp2pdata', (req: Request, res: Response) => {
logger.LogReq(req)
const questions = !!req.query.questions
const users = !!req.query.users
const userFiles = !!req.query.userFiles
const allTime = !!req.query.allTime
const user = req.session.user
@ -1377,6 +1429,7 @@ function setup(data: SubmoduleData): Submodule {
shouldSync: {
questions: questions,
users: users,
userFiles: userFiles,
},
allTime: allTime,
})

View file

@ -24,13 +24,19 @@ import logger from '../../../utils/logger'
import utils from '../../../utils/utils'
import { Request, SubmoduleData, User } from '../../../types/basicTypes'
import { paths, publicDir } from '../../../utils/files'
import constants from '../../../constants'
const dataFileName = '.data.json'
export interface UserDirDataFile {
uid: number
views: number
upvotes: number[]
downvotes: number[]
}
function listDir(subdir: string, userFilesDir: string) {
const safeSubdir = subdir.replace(/\.+/g, '').replace(/\/+/g, '')
const dir = userFilesDir + '/' + safeSubdir
const usersFile = dir + '/' + dataFileName
const usersFile = dir + '/' + constants.userFilesDataFileName
if (!utils.FileExists(dir)) {
return {
@ -99,7 +105,11 @@ function setup(data: SubmoduleData): void {
const dir = x[2]
const fname = x.pop()
const dataFilePath =
paths.userFilesDir + '/' + dir + '/' + dataFileName
paths.userFilesDir +
'/' +
dir +
'/' +
constants.userFilesDataFileName
const data = utils.ReadJSON(dataFilePath)
@ -179,7 +189,11 @@ function setup(data: SubmoduleData): void {
}
utils.deleteFile(filePath)
const usersFile =
paths.userFilesDir + '/' + safeDir + '/' + dataFileName
paths.userFilesDir +
'/' +
safeDir +
'/' +
constants.userFilesDataFileName
const users = utils.ReadJSON(usersFile)
delete users[safeFname]
utils.WriteFile(JSON.stringify(users), usersFile)
@ -247,7 +261,11 @@ function setup(data: SubmoduleData): void {
)
const usersFile =
paths.userFilesDir + '/' + safeDir + '/' + dataFileName
paths.userFilesDir +
'/' +
safeDir +
'/' +
constants.userFilesDataFileName
const users = utils.ReadJSON(usersFile)
users[body.fileName] = { uid: user.id }
utils.WriteFile(JSON.stringify(users), usersFile)
@ -270,7 +288,12 @@ function setup(data: SubmoduleData): void {
const x = safePath.split('/')
const dir = x[1]
const fname = x.pop()
const dataFilePath = paths.userFilesDir + '/' + dir + '/' + dataFileName
const dataFilePath =
paths.userFilesDir +
'/' +
dir +
'/' +
constants.userFilesDataFileName
const data = utils.ReadJSON(dataFilePath)

View file

@ -175,7 +175,9 @@ export interface PeerInfo {
pw?: string
sessionCookie?: string
lastSync?: number
lastQuestionsSync?: number
lastUsersSync?: number
lastUserFilesSync?: number
note?: string
http?: boolean
}

View file

@ -52,6 +52,10 @@ const PeerInfoSchemaBase = {
export const SelfInfoSchema: Schema = {
...PeerInfoSchemaBase,
properties: {
...PeerInfoSchemaBase.properties,
lastSync: { type: 'number' },
},
required: ['name', 'host', 'port', 'contact'],
}
@ -59,7 +63,9 @@ export const PeerInfoSchema: Schema = {
...PeerInfoSchemaBase,
properties: {
...PeerInfoSchemaBase.properties,
lastQuestionsSync: { type: 'number' },
lastUsersSync: { type: 'number' },
lastUserFilesSync: { type: 'number' },
publicKey: { type: 'string' },
pw: { type: 'string' },
sessionCookie: { type: 'string' },

View file

@ -11,6 +11,7 @@ import {
import logger from './logger'
import utils from './utils'
import { HttpsFiles } from '../types/basicTypes'
import path from 'node:path'
// FIXME: remove all file exists checks from everywhere for files that are created / checked here
@ -324,7 +325,7 @@ export const files = {
// user files
// --------------------------------------------------------------------------------
userFilesDir: {
path: publicDir + 'userFiles',
path: path.join(publicDir, 'userFiles'),
isDir: true,
},
} as const satisfies Record<string, FileDescriptor>