mirror of
https://gitlab.com/MrFry/mrfrys-node-server
synced 2025-04-01 20:24:18 +02:00
p2p code improvements
This commit is contained in:
parent
7dada00f72
commit
88423719e5
3 changed files with 467 additions and 286 deletions
|
@ -91,11 +91,24 @@ interface RemotePeerInfo {
|
|||
}
|
||||
}
|
||||
|
||||
interface SyncDataRes {
|
||||
interface SyncResult {
|
||||
old?: { [key: string]: number }
|
||||
added?: { [key: string]: number }
|
||||
final?: { [key: string]: number }
|
||||
msg?: string
|
||||
}
|
||||
|
||||
interface SyncDataResBase {
|
||||
result?: string
|
||||
questionDbs?: QuestionDb[]
|
||||
remoteInfo?: RemotePeerInfo
|
||||
}
|
||||
|
||||
interface UserSyncDataRes extends SyncDataResBase {
|
||||
encryptedUsers?: string
|
||||
}
|
||||
|
||||
interface QuestionSyncDataRes extends SyncDataResBase {
|
||||
questionDbs?: QuestionDb[]
|
||||
count?: {
|
||||
qdbs: number
|
||||
subjects: number
|
||||
|
@ -103,6 +116,15 @@ interface SyncDataRes {
|
|||
}
|
||||
}
|
||||
|
||||
interface NewDataResult {
|
||||
peer: PeerInfo
|
||||
result?: {
|
||||
questions?: GetResult<QuestionSyncDataRes>
|
||||
users?: GetResult<UserSyncDataRes>
|
||||
}
|
||||
error?: Error
|
||||
}
|
||||
|
||||
function updateThirdPartyPeers(
|
||||
newVal: Omit<PeerInfo, 'publicKey' | 'name' | 'contact'>[]
|
||||
) {
|
||||
|
@ -339,70 +361,99 @@ async function authAndGetNewData({
|
|||
peer,
|
||||
selfInfo,
|
||||
allTime,
|
||||
shouldSync,
|
||||
}: {
|
||||
peer: PeerInfo
|
||||
selfInfo: PeerInfo
|
||||
allTime?: boolean
|
||||
}): Promise<GetResult<SyncDataRes & { peer: PeerInfo }>> {
|
||||
let sessionCookie = peer.sessionCookie
|
||||
const lastSyncWithPeer = allTime ? 0 : peer.lastSync || 0
|
||||
const lastUsersSyncWithPeer = allTime ? 0 : peer.lastUsersSync || 0
|
||||
shouldSync: {
|
||||
questions: boolean
|
||||
users: boolean
|
||||
}
|
||||
}): Promise<NewDataResult> {
|
||||
try {
|
||||
const syncAll =
|
||||
!shouldSync ||
|
||||
Object.values(shouldSync).filter((x) => x).length === 0
|
||||
let sessionCookie = peer.sessionCookie
|
||||
|
||||
if (!sessionCookie) {
|
||||
const loginResult = await loginToPeer(peer)
|
||||
if (typeof loginResult === 'string') {
|
||||
sessionCookie = loginResult
|
||||
updatePeersFile({ ...peer, sessionCookie: loginResult })
|
||||
} else {
|
||||
return {
|
||||
error: loginResult,
|
||||
data: {
|
||||
peer: peer,
|
||||
},
|
||||
const login = async () => {
|
||||
const loginResult = await loginToPeer(peer)
|
||||
if (typeof loginResult === 'string') {
|
||||
sessionCookie = loginResult
|
||||
updatePeersFile(peer, { sessionCookie: loginResult })
|
||||
} else {
|
||||
throw {
|
||||
error: loginResult,
|
||||
data: {
|
||||
peer: peer,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const getSyncData = () => {
|
||||
return get<SyncDataRes>(
|
||||
{
|
||||
headers: {
|
||||
cookie: `sessionID=${sessionCookie}`,
|
||||
if (!sessionCookie) {
|
||||
await login()
|
||||
}
|
||||
|
||||
const getData = async <T>(path: string) => {
|
||||
return get<T>(
|
||||
{
|
||||
headers: {
|
||||
cookie: `sessionID=${sessionCookie}`,
|
||||
},
|
||||
host: peer.host,
|
||||
port: peer.port,
|
||||
path: path,
|
||||
},
|
||||
host: peer.host,
|
||||
port: peer.port,
|
||||
path: `/api/getnewdatasince?host=${encodeURIComponent(
|
||||
peerToString(selfInfo)
|
||||
)}${lastSyncWithPeer ? `&since=${lastSyncWithPeer}` : ''}${
|
||||
lastUsersSyncWithPeer
|
||||
? `&usersSince=${lastUsersSyncWithPeer}`
|
||||
: ''
|
||||
}`,
|
||||
},
|
||||
peer.http
|
||||
)
|
||||
}
|
||||
peer.http
|
||||
)
|
||||
}
|
||||
|
||||
let getRes = await getSyncData()
|
||||
let result: NewDataResult['result'] = {}
|
||||
|
||||
if (getRes.data?.result === 'nouser') {
|
||||
// FIXME: make this more pretty? (duplicate code, see above)
|
||||
const loginResult = await loginToPeer(peer)
|
||||
if (typeof loginResult === 'string') {
|
||||
sessionCookie = loginResult
|
||||
updatePeersFile({ ...peer, sessionCookie: loginResult })
|
||||
} else {
|
||||
return {
|
||||
error: loginResult,
|
||||
data: {
|
||||
peer: peer,
|
||||
},
|
||||
const setResult = async () => {
|
||||
if (shouldSync.questions || syncAll) {
|
||||
result.questions = await getData<QuestionSyncDataRes>(
|
||||
`/api/getnewdatasince?host=${encodeURIComponent(
|
||||
peerToString(selfInfo)
|
||||
)}${
|
||||
peer.lastSync && !allTime
|
||||
? `&since=${peer.lastSync}`
|
||||
: ''
|
||||
}`
|
||||
)
|
||||
}
|
||||
|
||||
if (shouldSync.users || syncAll) {
|
||||
result.users = await getData<QuestionSyncDataRes>(
|
||||
`/api/getnewuserssince?host=${encodeURIComponent(
|
||||
peerToString(selfInfo)
|
||||
)}${
|
||||
peer.lastUsersSync && !allTime
|
||||
? `&since=${peer.lastUsersSync}`
|
||||
: ''
|
||||
}`
|
||||
)
|
||||
}
|
||||
}
|
||||
getRes = await getSyncData()
|
||||
}
|
||||
|
||||
return { ...getRes, data: { ...getRes.data, peer: peer } }
|
||||
await setResult()
|
||||
|
||||
const hasNoUser = Object.values(result).find((res) => {
|
||||
return res.data?.result === 'nouser'
|
||||
})
|
||||
if (hasNoUser) {
|
||||
await login()
|
||||
result = {}
|
||||
await setResult()
|
||||
}
|
||||
|
||||
return { result: result, peer: peer }
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
return { error: e, peer: peer }
|
||||
}
|
||||
}
|
||||
|
||||
function setup(data: SubmoduleData): Submodule {
|
||||
|
@ -611,10 +662,13 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}
|
||||
|
||||
async function syncData({
|
||||
usersOnly,
|
||||
shouldSync,
|
||||
allTime,
|
||||
}: {
|
||||
usersOnly: boolean
|
||||
shouldSync: {
|
||||
questions: boolean
|
||||
users: boolean
|
||||
}
|
||||
allTime: boolean
|
||||
}) {
|
||||
if (peers.length === 0) {
|
||||
|
@ -633,9 +687,17 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}${logger.C()} peers`
|
||||
)
|
||||
|
||||
if (usersOnly) {
|
||||
logger.Log(`\tSyncing users only!`, 'yellowbg')
|
||||
}
|
||||
const syncAll =
|
||||
!shouldSync ||
|
||||
Object.values(shouldSync).filter((x) => x).length === 0
|
||||
|
||||
logger.Log(
|
||||
`\tSyncing: ${
|
||||
syncAll ? 'everything' : Object.keys(syncAll).join(', ')
|
||||
}`,
|
||||
'green'
|
||||
)
|
||||
|
||||
if (allTime) {
|
||||
logger.Log(`\tSyncing since all time!`, 'yellowbg')
|
||||
}
|
||||
|
@ -666,27 +728,54 @@ function setup(data: SubmoduleData): Submodule {
|
|||
peer: peer,
|
||||
selfInfo: selfInfo,
|
||||
allTime: allTime,
|
||||
shouldSync: shouldSync,
|
||||
})
|
||||
})
|
||||
|
||||
const allResults = await Promise.all(requests)
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// filtering, transforming, and counting data
|
||||
// filtering, transforming, and counting responses
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
allResults.forEach((res) => {
|
||||
if (res.error) {
|
||||
const errors = res?.error
|
||||
? [{ key: 'all', error: res.error }]
|
||||
: Object.entries(res.result)
|
||||
.map(([key, x]) =>
|
||||
x.error ? { error: x.error, key: key } : null
|
||||
)
|
||||
.filter((x) => !!x)
|
||||
|
||||
if (errors.length > 0) {
|
||||
logger.Log(
|
||||
`\tError syncing with ${peerToString(res.data.peer)}: ${
|
||||
res.error.message
|
||||
}`,
|
||||
`\tError syncing with ${peerToString(res.peer)}`,
|
||||
'red'
|
||||
)
|
||||
errors.forEach((e) => {
|
||||
logger.Log(`\t${e.key}: ${e.error.message}`)
|
||||
})
|
||||
}
|
||||
})
|
||||
const resultDataWithoutErrors = allResults
|
||||
.filter((res) => !res.error)
|
||||
.map((res) => res.data)
|
||||
const resultDataWithoutErrors: {
|
||||
questions?: QuestionSyncDataRes & { peer: PeerInfo }
|
||||
users?: UserSyncDataRes & { peer: PeerInfo }
|
||||
}[] = allResults.reduce((acc, resData) => {
|
||||
const resDataWithoutErrors = Object.entries(resData.result).reduce(
|
||||
(acc, [key, x]) => {
|
||||
if (!x.error) {
|
||||
acc[key] = { ...x.data, peer: resData.peer }
|
||||
}
|
||||
return acc
|
||||
},
|
||||
{}
|
||||
)
|
||||
|
||||
if (Object.keys(resDataWithoutErrors).length > 0) {
|
||||
return [...acc, resDataWithoutErrors]
|
||||
} else {
|
||||
return acc
|
||||
}
|
||||
}, [])
|
||||
|
||||
if (resultDataWithoutErrors.length === 0) {
|
||||
logger.Log(
|
||||
|
@ -698,59 +787,22 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}
|
||||
}
|
||||
|
||||
const recievedDataCounts: (number | string)[][] = []
|
||||
const resultDataWithoutEmptyDbs: (SyncDataRes & { peer: PeerInfo })[] =
|
||||
[]
|
||||
resultDataWithoutErrors.forEach((res) => {
|
||||
const qdbCount = res.questionDbs.length
|
||||
const { subjCount, questionCount } = countOfQdbs(res.questionDbs)
|
||||
|
||||
recievedDataCounts.push([
|
||||
peerToString(res.peer),
|
||||
qdbCount,
|
||||
subjCount,
|
||||
questionCount,
|
||||
])
|
||||
|
||||
if (questionCount > 0) {
|
||||
resultDataWithoutEmptyDbs.push(res)
|
||||
} else if (!usersOnly) {
|
||||
updatePeersFile({
|
||||
...res.peer,
|
||||
lastSync: syncStart,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
logger.Log(`\tRecieved data from peers:`)
|
||||
logger.logTable(
|
||||
[['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts],
|
||||
{
|
||||
colWidth: [15],
|
||||
rowPrefix: '\t',
|
||||
}
|
||||
)
|
||||
|
||||
const resultData = resultDataWithoutEmptyDbs.map((res) => {
|
||||
return {
|
||||
...res,
|
||||
questionDbs: res.questionDbs.map((qdb) => {
|
||||
return setupQuestionsForMerge(qdb, res.peer)
|
||||
}),
|
||||
}
|
||||
})
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// third party peers handling
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
const peersHosts = [...peers.map((peer) => peer.host), selfInfo.host]
|
||||
const thirdPartyPeers = resultData
|
||||
.map((res) => res.remoteInfo)
|
||||
const thirdPartyPeers = resultDataWithoutErrors
|
||||
.map((res) => {
|
||||
return Object.values(res).map((x) => x.remoteInfo.myPeers)
|
||||
})
|
||||
.filter((x) => !!x)
|
||||
.flatMap((x) => x)
|
||||
.flatMap((x) => {
|
||||
return x.myPeers.filter(
|
||||
return x.filter(
|
||||
(recievedPeer) => !peersHosts.includes(recievedPeer.host)
|
||||
)
|
||||
})
|
||||
|
||||
if (thirdPartyPeers.length > 0) {
|
||||
updateThirdPartyPeers(thirdPartyPeers)
|
||||
logger.Log(
|
||||
|
@ -762,22 +814,56 @@ function setup(data: SubmoduleData): Submodule {
|
|||
)
|
||||
}
|
||||
|
||||
// all results statistics
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// data syncing
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
|
||||
const getData = (key: keyof NewDataResult['result']) => {
|
||||
return resultDataWithoutErrors
|
||||
.filter((x) => x[key])
|
||||
.map((x) => x[key])
|
||||
}
|
||||
|
||||
const syncResults: SyncResult[] = []
|
||||
|
||||
const userData = getData('users')
|
||||
if (userData) {
|
||||
const res = await syncUsers(userData, syncStart)
|
||||
syncResults.push(res)
|
||||
}
|
||||
|
||||
const questionData = getData('questions')
|
||||
if (userData) {
|
||||
const res = await syncQuestions(questionData, syncStart)
|
||||
syncResults.push(res)
|
||||
}
|
||||
|
||||
return syncResults.reduce(
|
||||
(acc, x) => {
|
||||
return {
|
||||
old: { ...acc.old, ...x.old },
|
||||
added: { ...acc.added, ...x.added },
|
||||
final: { ...acc.final, ...x.final },
|
||||
}
|
||||
},
|
||||
{ old: {}, added: {}, final: {} }
|
||||
)
|
||||
}
|
||||
|
||||
async function syncUsers(
|
||||
userData: (UserSyncDataRes & { peer: PeerInfo })[],
|
||||
syncStart: number
|
||||
): Promise<SyncResult> {
|
||||
logger.Log('Syncing users...')
|
||||
const resultsCount: {
|
||||
[key: string]: {
|
||||
newUsers?: number
|
||||
newQuestionDbs?: number
|
||||
newSubjects?: number
|
||||
newQuestions?: number
|
||||
}
|
||||
} = {}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// new users handlin
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
const oldUserCount = dbtools.SelectAll(userDB, 'users').length
|
||||
|
||||
try {
|
||||
resultData.forEach((res) => {
|
||||
userData.forEach((res) => {
|
||||
if (res.encryptedUsers) {
|
||||
let addedUserCount = 0
|
||||
const decryptedUsers: User[] = JSON.parse(
|
||||
|
@ -802,8 +888,7 @@ function setup(data: SubmoduleData): Submodule {
|
|||
resultsCount[peerToString(res.peer)] = {
|
||||
newUsers: addedUserCount,
|
||||
}
|
||||
updatePeersFile({
|
||||
...res.peer,
|
||||
updatePeersFile(res.peer, {
|
||||
lastUsersSync: syncStart,
|
||||
})
|
||||
}
|
||||
|
@ -817,12 +902,97 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}
|
||||
const newUserCount = dbtools.SelectAll(userDB, 'users').length
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
if (Object.keys(resultsCount).length === 0) {
|
||||
logger.Log('No new users received')
|
||||
} else {
|
||||
logger.logTable(
|
||||
[
|
||||
['', 'Users'],
|
||||
['Old', oldUserCount],
|
||||
...Object.entries(resultsCount).map(([key, result]) => {
|
||||
return [key, result.newUsers]
|
||||
}),
|
||||
['Added total', newUserCount - oldUserCount],
|
||||
['Final', newUserCount],
|
||||
],
|
||||
{ colWidth: [15], rowPrefix: '\t' }
|
||||
)
|
||||
}
|
||||
logger.Log(`Successfully synced users!`, 'green')
|
||||
|
||||
return {
|
||||
old: {
|
||||
oldUserCount: oldUserCount,
|
||||
},
|
||||
added: {
|
||||
totalNewUers: newUserCount - oldUserCount,
|
||||
},
|
||||
final: {
|
||||
newUserCount: newUserCount,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async function syncQuestions(
|
||||
questionData: (QuestionSyncDataRes & { peer: PeerInfo })[],
|
||||
syncStart: number
|
||||
): Promise<SyncResult> {
|
||||
logger.Log('Syncing questions...')
|
||||
const recievedDataCounts: (number | string)[][] = []
|
||||
// all results statistics
|
||||
const resultsCount: {
|
||||
[key: string]: {
|
||||
newQuestionDbs?: number
|
||||
newSubjects?: number
|
||||
newQuestions?: number
|
||||
}
|
||||
} = {}
|
||||
|
||||
const resultDataWithoutEmptyDbs: (QuestionSyncDataRes & {
|
||||
peer: PeerInfo
|
||||
})[] = []
|
||||
questionData.forEach((res) => {
|
||||
const qdbCount = res.questionDbs.length
|
||||
const { subjCount, questionCount } = countOfQdbs(res.questionDbs)
|
||||
|
||||
recievedDataCounts.push([
|
||||
peerToString(res.peer),
|
||||
qdbCount,
|
||||
subjCount,
|
||||
questionCount,
|
||||
])
|
||||
|
||||
if (questionCount > 0) {
|
||||
resultDataWithoutEmptyDbs.push(res)
|
||||
} else {
|
||||
updatePeersFile(res.peer, {
|
||||
lastSync: syncStart,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
logger.Log(`\tRecieved data from peers:`)
|
||||
logger.logTable(
|
||||
[['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts],
|
||||
{
|
||||
colWidth: [15],
|
||||
rowPrefix: '\t',
|
||||
}
|
||||
)
|
||||
|
||||
const resultData = resultDataWithoutEmptyDbs.map((res) => {
|
||||
return {
|
||||
...res,
|
||||
questionDbs: res.questionDbs.map((qdb) => {
|
||||
return setupQuestionsForMerge(qdb, res.peer)
|
||||
}),
|
||||
}
|
||||
})
|
||||
|
||||
const hasNewData = resultData.length > 0
|
||||
if (!hasNewData) {
|
||||
logger.Log(
|
||||
`No peers returned any new questions. Sync successfully finished!`,
|
||||
`No peers returned any new questions. Question sync successfully finished!`,
|
||||
'green'
|
||||
)
|
||||
updateLastSync(selfInfo, syncStart)
|
||||
|
@ -831,40 +1001,6 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}
|
||||
}
|
||||
|
||||
if (usersOnly) {
|
||||
if (Object.keys(resultsCount).length === 0) {
|
||||
logger.Log('No new users received')
|
||||
} else {
|
||||
logger.logTable(
|
||||
[
|
||||
['', 'Users'],
|
||||
['Old', oldUserCount],
|
||||
...Object.entries(resultsCount).map(([key, result]) => {
|
||||
return [key, result.newUsers]
|
||||
}),
|
||||
['Added total', newUserCount - oldUserCount],
|
||||
['Final', newUserCount],
|
||||
],
|
||||
{ colWidth: [15], rowPrefix: '\t' }
|
||||
)
|
||||
}
|
||||
logger.Log(
|
||||
`Sync successfully finished! Synced users only.`,
|
||||
'green'
|
||||
)
|
||||
return {
|
||||
old: {
|
||||
oldUserCount: oldUserCount,
|
||||
},
|
||||
added: {
|
||||
totalNewUers: newUserCount - oldUserCount,
|
||||
},
|
||||
final: {
|
||||
newUserCount: newUserCount,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
// backup
|
||||
// -------------------------------------------------------------------------------------------------------
|
||||
|
@ -921,8 +1057,7 @@ function setup(data: SubmoduleData): Submodule {
|
|||
newQuestions: newQuestionCount,
|
||||
}
|
||||
// Processing result data is successfull
|
||||
updatePeersFile({
|
||||
...peer,
|
||||
updatePeersFile(peer, {
|
||||
lastSync: syncStart,
|
||||
})
|
||||
}
|
||||
|
@ -939,7 +1074,6 @@ function setup(data: SubmoduleData): Submodule {
|
|||
([key, value]) => {
|
||||
return [
|
||||
key.length > 14 ? key.substring(0, 14) + '...' : key,
|
||||
value.newUsers,
|
||||
value.newQuestionDbs,
|
||||
value.newSubjects,
|
||||
value.newQuestions,
|
||||
|
@ -954,60 +1088,40 @@ function setup(data: SubmoduleData): Submodule {
|
|||
)
|
||||
}
|
||||
|
||||
const newUsers = sumNewCount('newUsers')
|
||||
const totalNewQuestions = sumNewCount('newQuestions')
|
||||
const totalNewSubjects = sumNewCount('newSubjects')
|
||||
const totalNewQdbs = sumNewCount('newQuestionDbs')
|
||||
|
||||
logger.logTable(
|
||||
[
|
||||
['', 'Users', 'QDBs', 'Subjs', 'Questions'],
|
||||
[
|
||||
'Old',
|
||||
oldUserCount,
|
||||
oldQuestionDbCount,
|
||||
oldSubjCount,
|
||||
oldQuestionCount,
|
||||
],
|
||||
['', 'QDBs', 'Subjs', 'Questions'],
|
||||
['Old', oldQuestionDbCount, oldSubjCount, oldQuestionCount],
|
||||
...resultsTable,
|
||||
[
|
||||
'Added total',
|
||||
newUsers,
|
||||
totalNewQdbs,
|
||||
totalNewSubjects,
|
||||
totalNewQuestions,
|
||||
],
|
||||
[
|
||||
'Final',
|
||||
newUserCount,
|
||||
newQuestionDbCount,
|
||||
newSubjCount,
|
||||
newQuestionCount,
|
||||
],
|
||||
['Final', newQuestionDbCount, newSubjCount, newQuestionCount],
|
||||
],
|
||||
{ colWidth: [15], rowPrefix: '\t' }
|
||||
)
|
||||
|
||||
logger.Log(
|
||||
`Question DB-s written! Sync successfully finished!`,
|
||||
'green'
|
||||
)
|
||||
logger.Log(`Successfully synced questions!`, 'green')
|
||||
|
||||
return {
|
||||
old: {
|
||||
oldUserCount: oldUserCount,
|
||||
oldQuestionDbCount: oldQuestionDbCount,
|
||||
oldSubjCount: oldSubjCount,
|
||||
oldQuestionCount: oldQuestionCount,
|
||||
},
|
||||
added: {
|
||||
totalNewUers: newUsers,
|
||||
totalNewQdbs: totalNewQdbs,
|
||||
totalNewSubjects: totalNewSubjects,
|
||||
totalNewQuestions: totalNewQuestions,
|
||||
},
|
||||
final: {
|
||||
newUserCount: newUserCount,
|
||||
newQuestionDbCount: newQuestionDbCount,
|
||||
newSubjCount: newSubjCount,
|
||||
newQuestionCount: newQuestionCount,
|
||||
|
@ -1015,6 +1129,26 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}
|
||||
}
|
||||
|
||||
function handleNewThirdPartyPeer(remoteHost: string) {
|
||||
logger.Log(
|
||||
'Couldn\'t find remote peer info based on remoteHost: "' +
|
||||
remoteHost +
|
||||
'". This could mean that the host uses this server as peer, but this server does not ' +
|
||||
'use it as a peer.',
|
||||
'yellowbg'
|
||||
)
|
||||
if (remoteHost.includes(':')) {
|
||||
const [host, port] = remoteHost.split(':')
|
||||
updateThirdPartyPeers([
|
||||
{
|
||||
host: host,
|
||||
port: +port,
|
||||
},
|
||||
])
|
||||
logger.Log('Host info written to host info file')
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------
|
||||
// APP SETUP
|
||||
// ---------------------------------------------------------------------------------------
|
||||
|
@ -1027,24 +1161,114 @@ function setup(data: SubmoduleData): Submodule {
|
|||
})
|
||||
|
||||
// TODO: get all user files
|
||||
|
||||
app.get('/getnewdatasince', (req: Request, res: Response<SyncDataRes>) => {
|
||||
// FIXME: hash question db to see if different?
|
||||
// it could help in determining if it should be checked for new data, but it would only save
|
||||
// a getNewDataSince() call per question db
|
||||
logger.LogReq(req)
|
||||
app.get('/getnewfilessince', (req: Request, res: Response<any>) => {
|
||||
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
|
||||
const usersSince = Number.isNaN(+req.query.usersSince)
|
||||
? 0
|
||||
: +req.query.usersSince
|
||||
const remoteHost = req.query.host
|
||||
const usersOnly = !!req.query.usersOnly
|
||||
|
||||
const result: SyncDataRes = {
|
||||
const remoteHost = req.query.host
|
||||
const hostToLog = remoteHost || 'Unknown host'
|
||||
|
||||
const result: any = {
|
||||
remoteInfo: getSelfInfo(),
|
||||
}
|
||||
|
||||
if (!usersOnly) {
|
||||
if (remoteHost) {
|
||||
const remotePeerInfo = peers.find((peer) => {
|
||||
return peerToString(peer) === remoteHost
|
||||
})
|
||||
if (!remotePeerInfo) {
|
||||
handleNewThirdPartyPeer(remoteHost)
|
||||
}
|
||||
}
|
||||
const usersSinceDate = since
|
||||
? new Date(since).toLocaleString()
|
||||
: 'all time'
|
||||
|
||||
logger.Log(
|
||||
`\tSending new files to ${logger.C(
|
||||
'blue'
|
||||
)}${hostToLog}${logger.C()} since ${logger.C(
|
||||
'blue'
|
||||
)}${usersSinceDate}${logger.C()}`
|
||||
)
|
||||
|
||||
res.json(result)
|
||||
})
|
||||
|
||||
app.get(
|
||||
'/getnewuserssince',
|
||||
(req: Request, res: Response<UserSyncDataRes>) => {
|
||||
logger.LogReq(req)
|
||||
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
|
||||
|
||||
const remoteHost = req.query.host
|
||||
let hostToLog = remoteHost || 'Unknown host'
|
||||
let sentUsers = 0
|
||||
|
||||
const result: UserSyncDataRes = {
|
||||
remoteInfo: getSelfInfo(),
|
||||
}
|
||||
|
||||
if (remoteHost) {
|
||||
const remotePeerInfo = peers.find((peer) => {
|
||||
return peerToString(peer) === remoteHost
|
||||
})
|
||||
if (!remotePeerInfo) {
|
||||
handleNewThirdPartyPeer(remoteHost)
|
||||
} else {
|
||||
hostToLog = peerToString(remotePeerInfo)
|
||||
}
|
||||
|
||||
if (remotePeerInfo) {
|
||||
const remotePublicKey = remotePeerInfo?.publicKey
|
||||
if (remotePublicKey) {
|
||||
// FIXME: sign data?
|
||||
const newUsers = getNewUsersSince(since)
|
||||
sentUsers = newUsers.length
|
||||
result.encryptedUsers = encrypt(
|
||||
remotePublicKey,
|
||||
JSON.stringify(newUsers)
|
||||
)
|
||||
|
||||
const usersSinceDate = since
|
||||
? new Date(since).toLocaleString()
|
||||
: 'all time'
|
||||
|
||||
logger.Log(
|
||||
`\tSending new users to ${logger.C(
|
||||
'blue'
|
||||
)}${hostToLog}${logger.C()} since ${logger.C(
|
||||
'blue'
|
||||
)}${usersSinceDate}${logger.C()}. Sent users: ${logger.C(
|
||||
'blue'
|
||||
)}${sentUsers}${logger.C()}`
|
||||
)
|
||||
} else if (remotePeerInfo) {
|
||||
logger.Log(
|
||||
`Warning: "${hostToLog}" has no public key saved!`,
|
||||
'yellowbg'
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res.json(result)
|
||||
}
|
||||
)
|
||||
|
||||
app.get(
|
||||
'/getnewdatasince',
|
||||
(req: Request, res: Response<QuestionSyncDataRes>) => {
|
||||
// FIXME: hash question db to see if different?
|
||||
// it could help in determining if it should be checked for new data, but it would only save
|
||||
// a getNewDataSince() call per question db
|
||||
logger.LogReq(req)
|
||||
const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since
|
||||
const remoteHost = req.query.host
|
||||
|
||||
const result: QuestionSyncDataRes = {
|
||||
remoteInfo: getSelfInfo(),
|
||||
}
|
||||
|
||||
const questionDbsWithNewQuestions = Number.isNaN(since)
|
||||
? getQuestionDbs()
|
||||
: getQuestionDbs()
|
||||
|
@ -1069,91 +1293,34 @@ function setup(data: SubmoduleData): Submodule {
|
|||
subjects: subjects,
|
||||
questions: questions,
|
||||
}
|
||||
}
|
||||
|
||||
let hostToLog = remoteHost || 'Unknown host'
|
||||
let sentUsers = 0
|
||||
if (remoteHost) {
|
||||
const remotePeerInfo = peers.find((peer) => {
|
||||
return peerToString(peer) === remoteHost
|
||||
})
|
||||
if (remotePeerInfo) {
|
||||
hostToLog = peerToString(remotePeerInfo)
|
||||
const remotePublicKey = remotePeerInfo?.publicKey
|
||||
if (remotePublicKey) {
|
||||
// FIXME: sign data?
|
||||
const newUsers = getNewUsersSince(usersSince)
|
||||
sentUsers = newUsers.length
|
||||
result.encryptedUsers = encrypt(
|
||||
remotePublicKey,
|
||||
JSON.stringify(newUsers)
|
||||
)
|
||||
logger.Log(
|
||||
`\tSending new users to "${remoteHost}" (encrypted)`,
|
||||
'green'
|
||||
)
|
||||
} else if (remotePeerInfo) {
|
||||
logger.Log(
|
||||
`Warning: "${hostToLog}" has no public key saved!`,
|
||||
'yellowbg'
|
||||
)
|
||||
}
|
||||
} else {
|
||||
logger.Log(
|
||||
'Couldn\'t find remote peer info based on remoteHost: "' +
|
||||
remoteHost +
|
||||
'". This could mean that the host uses this server as peer, but this server does not ' +
|
||||
'use it as a peer.',
|
||||
'yellowbg'
|
||||
)
|
||||
if (remoteHost.includes(':')) {
|
||||
const [host, port] = remoteHost.split(':')
|
||||
updateThirdPartyPeers([
|
||||
{
|
||||
host: host,
|
||||
port: +port,
|
||||
},
|
||||
])
|
||||
logger.Log('Host info written to host info file')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const usersSinceDate = usersSince
|
||||
? new Date(since).toLocaleString()
|
||||
: 'all time'
|
||||
|
||||
if (usersOnly) {
|
||||
logger.Log('Sending users only!', 'yellowbg')
|
||||
logger.Log(
|
||||
`\tSending new users to ${logger.C(
|
||||
'blue'
|
||||
)}${hostToLog}${logger.C()} since ${logger.C(
|
||||
'blue'
|
||||
)}${usersSinceDate}${logger.C()}. Sent users: ${logger.C(
|
||||
'blue'
|
||||
)}${sentUsers}${logger.C()}`
|
||||
)
|
||||
} else {
|
||||
console.log(since)
|
||||
let hostToLog = remoteHost || 'Unknown host'
|
||||
const dateToLog = since
|
||||
? new Date(since).toLocaleString()
|
||||
: 'all time'
|
||||
|
||||
if (remoteHost) {
|
||||
const remotePeerInfo = peers.find((peer) => {
|
||||
return peerToString(peer) === remoteHost
|
||||
})
|
||||
if (!remotePeerInfo) {
|
||||
handleNewThirdPartyPeer(remoteHost)
|
||||
} else {
|
||||
hostToLog = peerToString(remotePeerInfo)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log(
|
||||
`\tSending new data to ${logger.C(
|
||||
'blue'
|
||||
)}${hostToLog}${logger.C()} since ${logger.C(
|
||||
'blue'
|
||||
)}${dateToLog}${logger.C()}, and new users since ${logger.C(
|
||||
'blue'
|
||||
)}${usersSinceDate}${logger.C()}`
|
||||
)}${dateToLog}${logger.C()}`
|
||||
)
|
||||
logger.logTable(
|
||||
[
|
||||
['Users', 'QDBs', 'Subjs', 'Questions'],
|
||||
['QDBs', 'Subjs', 'Questions'],
|
||||
[
|
||||
sentUsers,
|
||||
result.questionDbs.length,
|
||||
result.count.subjects,
|
||||
result.count.questions,
|
||||
|
@ -1161,23 +1328,25 @@ function setup(data: SubmoduleData): Submodule {
|
|||
],
|
||||
{ rowPrefix: '\t' }
|
||||
)
|
||||
}
|
||||
|
||||
res.json(result)
|
||||
})
|
||||
res.json(result)
|
||||
}
|
||||
)
|
||||
|
||||
app.get('/syncp2pdata', (req: Request, res: Response) => {
|
||||
logger.LogReq(req)
|
||||
const usersOnly = !!req.query.usersOnly
|
||||
const questions = !!req.query.questions
|
||||
const users = !!req.query.users
|
||||
const allTime = !!req.query.allTime
|
||||
const user = req.session.user
|
||||
if (!user || user.id !== 1) {
|
||||
res.json({
|
||||
status: 'error',
|
||||
message: 'only user 1 can call this EP',
|
||||
})
|
||||
return
|
||||
}
|
||||
// const user = req.session.user
|
||||
|
||||
// if (!user || user.id !== 1) {
|
||||
// res.json({
|
||||
// status: 'error',
|
||||
// message: 'only user 1 can call this EP',
|
||||
// })
|
||||
// return
|
||||
// }
|
||||
|
||||
// FIXME: /syncResult EP if this EP times out, but we still need the result
|
||||
if (syncInProgress) {
|
||||
|
@ -1189,7 +1358,13 @@ function setup(data: SubmoduleData): Submodule {
|
|||
|
||||
syncInProgress = true
|
||||
setPendingJobsAlertCount(5000)
|
||||
syncData({ usersOnly: usersOnly, allTime: allTime })
|
||||
syncData({
|
||||
shouldSync: {
|
||||
questions: questions,
|
||||
users: users,
|
||||
},
|
||||
allTime: allTime,
|
||||
})
|
||||
.then((syncResult) => {
|
||||
res.json({
|
||||
msg: 'sync successfull',
|
||||
|
|
|
@ -10,11 +10,17 @@ export function peerToString(peer: {
|
|||
return `${peer.host}:${peer.port}`
|
||||
}
|
||||
|
||||
export function isPeerSameAs(peer1: PeerInfo, peer2: PeerInfo): boolean {
|
||||
export function isPeerSameAs(
|
||||
peer1: { host: string; port: number },
|
||||
peer2: { host: string; port: number }
|
||||
): boolean {
|
||||
return peer1.host === peer2.host && peer1.port === peer2.port
|
||||
}
|
||||
|
||||
export function updatePeersFile(updatedPeer: PeerInfo): void {
|
||||
export function updatePeersFile(
|
||||
peerToUpdate: PeerInfo,
|
||||
updatedPeer: Partial<PeerInfo>
|
||||
): void {
|
||||
const newVal = readAndValidateFile<PeerInfo[]>(files.peersFile)
|
||||
let peers
|
||||
if (newVal) {
|
||||
|
@ -24,7 +30,7 @@ export function updatePeersFile(updatedPeer: PeerInfo): void {
|
|||
throw new Error('Peers file was invalid while trying to update it!')
|
||||
|
||||
const updatedPeers = peers.map((x) => {
|
||||
if (isPeerSameAs(updatedPeer, x)) {
|
||||
if (isPeerSameAs(peerToUpdate, x)) {
|
||||
return {
|
||||
...x,
|
||||
...updatedPeer,
|
||||
|
|
|
@ -98,7 +98,7 @@ export const handleQuestionsToPeers = async (
|
|||
results.loginErrors.push(peer)
|
||||
continue
|
||||
}
|
||||
updatePeersFile({ ...peer, sessionCookie: sessionCookie })
|
||||
updatePeersFile(peer, { sessionCookie: sessionCookie })
|
||||
}
|
||||
|
||||
let res = await postData(peer, sessionCookie)
|
||||
|
@ -109,7 +109,7 @@ export const handleQuestionsToPeers = async (
|
|||
results.loginErrors.push(peer)
|
||||
continue
|
||||
}
|
||||
updatePeersFile({ ...peer, sessionCookie: sessionCookie })
|
||||
updatePeersFile(peer, { sessionCookie: sessionCookie })
|
||||
res = await postData(peer, sessionCookie)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue