Adds pool processing

Changes display to table
Fixes Max Limit in relays.
Fixes bug when kind 3 is not downloaded
Breaks into multiple subscriptions to use until
Adds relay authentication
This commit is contained in:
Vitor Pamplona 2024-01-28 12:43:23 -05:00
parent 48853d9f1c
commit 1256a6b8b1
5 changed files with 218 additions and 75 deletions

View File

@ -145,7 +145,7 @@
<p id="checking-relays-header"></p>
</div>
<div class="box-content" id="checking-relays-box">
<p id="checking-relays"></p>
<table id="checking-relays" class="fullwidth"></table>
</div>
<div class="box-content">
<p id="file-download"></p>

View File

@ -34,7 +34,7 @@ const fetchAndBroadcast = async () => {
$('#checking-relays-header').text("Waiting for Relays:")
// get all events from relays
const filters =[{ authors: [pubkey] }, { "#p": [pubkey] }]
const filters =[{ authors: [pubkey] }, { "#p": [pubkey] }]
const data = (await getEvents(filters, pubkey)).sort((a, b) => b.created_at - a.created_at)
// inform user fetching is done
@ -42,25 +42,28 @@ const fetchAndBroadcast = async () => {
$('#fetching-progress').val(relays.length)
const latestKind3 = data.filter((it) => it.kind == 3 && it.pubkey === pubkey)[0]
const myRelaySet = JSON.parse(latestKind3.content)
relays = Object.keys(myRelaySet).filter(url => myRelaySet[url].write).map(url => url)
$('#checking-relays-header-box').css('display', 'none')
$('#checking-relays-box').css('display', 'none')
// inform user that backup file (js format) is being downloaded
$('#file-download').html(txt.download)
downloadFile(data, 'nostr-backup.js')
// inform user that app is broadcasting events to relays
$('#broadcasting-status').html(txt.broadcasting)
// show and update broadcasting progress bar
$('#broadcasting-progress').css('visibility', 'visible')
$('#broadcasting-progress').prop('max', relays.length)
if (latestKind3) {
const myRelaySet = JSON.parse(latestKind3.content)
relays = Object.keys(myRelaySet).filter(url => myRelaySet[url].write).map(url => url)
$('#checking-relays-header-box').css('display', 'flex')
$('#checking-relays-box').css('display', 'flex')
$('#checking-relays-header').text("Broadcasting to Relays:")
await broadcastEvents(data)
$('#checking-relays-header-box').css('display', 'none')
$('#checking-relays-box').css('display', 'none')
// inform user that backup file (js format) is being downloaded
$('#file-download').html(txt.download)
downloadFile(data, 'nostr-backup.js')
// inform user that app is broadcasting events to relays
$('#broadcasting-status').html(txt.broadcasting)
// show and update broadcasting progress bar
$('#broadcasting-progress').css('visibility', 'visible')
$('#broadcasting-progress').prop('max', relays.length)
$('#checking-relays-header-box').css('display', 'flex')
$('#checking-relays-box').css('display', 'flex')
$('#checking-relays-header').text("Broadcasting to Relays:")
await broadcastEvents(data)
}
// inform user that broadcasting is done
$('#broadcasting-status').html(txt.broadcasting + checkMark)
@ -84,9 +87,6 @@ if (window.nostr) {
$('#get-from-extension').css('display', '')
}
// button click handler
const justBroadcast = async (fileName) => {
const reader = new FileReader();

View File

@ -47,7 +47,7 @@ function hexToBytes(hex) {
tempLink.click()
}
const updateRelayStatus = (relay, status, addToCount, relayStatusAndCount) => {
const updateRelayStatus = (relay, status, addToCount, until, relayStatusAndCount) => {
if (relayStatusAndCount[relay] == undefined) {
relayStatusAndCount[relay] = {}
}
@ -55,6 +55,8 @@ function hexToBytes(hex) {
if (status)
relayStatusAndCount[relay].status = status
relayStatusAndCount[relay].until = until
if (relayStatusAndCount[relay].count != undefined)
relayStatusAndCount[relay].count = relayStatusAndCount[relay].count + addToCount
else
@ -65,8 +67,17 @@ function hexToBytes(hex) {
const displayRelayStatus = (relayStatusAndCount) => {
if (Object.keys(relayStatusAndCount).length > 0) {
let newText = Object.keys(relayStatusAndCount).map(
it => it.replace("wss://", "").replace("ws://", "") + ": " + relayStatusAndCount[it].status + " (" + relayStatusAndCount[it].count + ")"
const newText = Object.keys(relayStatusAndCount).map(
it => {
let untilStr = "";
if (relayStatusAndCount[it].until)
untilStr = " <" + new Date(relayStatusAndCount[it].until * 1000).toLocaleDateString("en-US")
const relayName = it.replace("wss://", "").replace("ws://", "")
const line = "<td>" + relayName + "</td><td>" + relayStatusAndCount[it].status + "</td><td>" + untilStr + "</td><td>" + relayStatusAndCount[it].count + "</td>"
return "<tr>" +line+ "</tr>"
}
).join("<br />")
$('#checking-relays').html(newText)
} else {
@ -79,83 +90,154 @@ function hexToBytes(hex) {
const fetchFromRelay = async (relay, filters, pubkey, events, relayStatus) =>
new Promise((resolve, reject) => {
try {
updateRelayStatus(relay, "Starting", 0, relayStatus)
updateRelayStatus(relay, "Starting", 0, undefined, relayStatus)
// open websocket
const ws = new WebSocket(relay)
// prevent hanging forever
let myTimeout = setTimeout(() => {
ws.close()
reject('timeout')
reject(relay)
}, 10_000)
const subscriptions = Object.fromEntries(filters.map ( (filter, index) => {
let id = "my-sub-"+index
// subscription id
const subsId = 'my-sub'
return [
id, {
id: id,
counter: 0,
lastEvent: null,
done: false,
filter: filter,
eventIds: new Set()
}
]
}))
// subscribe to events filtered by author
ws.onopen = () => {
clearTimeout(myTimeout)
myTimeout = setTimeout(() => {
ws.close()
reject('timeout')
reject(relay)
}, 10_000)
updateRelayStatus(relay, "Downloading", 0, relayStatus)
ws.send(JSON.stringify(['REQ', subsId].concat(filters)))
updateRelayStatus(relay, "Downloading", 0, undefined, relayStatus)
for (const [key, sub] of Object.entries(subscriptions)) {
ws.send(JSON.stringify(['REQ', sub.id, sub.filter]))
}
}
// Listen for messages
ws.onmessage = (event) => {
const [msgType, subscriptionId, data] = JSON.parse(event.data)
// event messages
if (msgType === 'EVENT' && subscriptionId === subsId) {
if (msgType === 'EVENT') {
clearTimeout(myTimeout)
myTimeout = setTimeout(() => {
ws.close()
reject('timeout')
reject(relay)
}, 10_000)
const { id } = data
try {
const { id } = data
// don't save/reboradcast kind 3s that are not from the author.
// their are too big.
if (data.kind == 3 && data.pubkey != pubkey) {
if (!subscriptions[subscriptionId].lastEvent || data.created_at < subscriptions[subscriptionId].lastEvent.created_at)
subscriptions[subscriptionId].lastEvent = data
if (data.id in subscriptions[subscriptionId].eventIds) return
subscriptions[subscriptionId].eventIds.add(data.id)
subscriptions[subscriptionId].counter++
// don't save/reboradcast kind 3s that are not from the author.
// their are too big.
if (data.kind == 3 && data.pubkey != pubkey) {
return
}
let until = undefined
if (subscriptions[subscriptionId].lastEvent) {
until = subscriptions[subscriptionId].lastEvent.created_at
}
updateRelayStatus(relay, undefined, 1, until, relayStatus)
// prevent duplicated events
if (events[id]) return
else events[id] = data
// show how many events were found until this moment
$('#events-found').text(`${Object.keys(events).length} events found`)
} catch(err) {
console.log(err, event)
return
}
updateRelayStatus(relay, undefined, 1, relayStatus)
// prevent duplicated events
if (events[id]) return
else events[id] = data
// show how many events were found until this moment
$('#events-found').text(`${Object.keys(events).length} events found`)
}
// end of subscription messages
if (msgType === 'EOSE' && subscriptionId === subsId) {
updateRelayStatus(relay, "Done", 0, relayStatus)
ws.close()
resolve()
if (msgType === 'EOSE') {
// Restarting the filter is necessary to go around Max Limits for each relay.
if (subscriptions[subscriptionId].counter < 2) {
subscriptions[subscriptionId].done = true
console.log(relay, subscriptionId, event.data)
let alldone = Object.values(subscriptions).every(filter => filter.done === true);
if (alldone) {
updateRelayStatus(relay, "Done", 0, undefined, relayStatus)
ws.close()
resolve(relay)
}
} else {
//console.log("Limit: ", { ...filters[0], until: lastSub1Event.created_at })
subscriptions[subscriptionId].counter = 0
ws.send(JSON.stringify(['REQ', subscriptions[subscriptionId].id, { ...subscriptions[subscriptionId].filter, until: subscriptions[subscriptionId].lastEvent.created_at } ]))
}
}
if (msgType === 'AUTH') {
signNostrAuthEvent(relay, subscriptionId).then(
(event) => {
if (event)
ws.send(JSON.stringify(['EVENT', event]))
else {
updateRelayStatus(relay, "AUTH Req", 0, undefined, relayStatus)
ws.close()
reject(relay)
}
},
(reason) => {
updateRelayStatus(relay, "AUTH Req", 0, undefined, relayStatus)
ws.close()
reject(relay)
},
)
}
}
ws.onerror = (err) => {
updateRelayStatus(relay, "Done", 0, relayStatus)
ws.close()
reject(err)
updateRelayStatus(relay, "Done", 0, undefined, relayStatus)
try {
ws.close()
reject(relay)
} catch {
reject(relay)
}
}
ws.onclose = (socket, event) => {
updateRelayStatus(relay, "Done", 0, relayStatus)
resolve()
updateRelayStatus(relay, "Done", 0, undefined, relayStatus)
resolve(relay)
}
} catch (exception) {
console.log(exception)
updateRelayStatus(relay, "Error", 0, relayStatus)
updateRelayStatus(relay, "Error", 0, undefined, relayStatus)
try {
ws.close()
} catch (exception) {
}
reject(exception)
reject(relay)
}
})
@ -165,19 +247,40 @@ function hexToBytes(hex) {
const events = {}
// batch processing of 10 relays
let fetchFunctions = [...relays]
while (fetchFunctions.length) {
let relaysForThisRound = fetchFunctions.splice(0, 10)
let relayStatus = {}
$('#fetching-progress').val(relays.length - fetchFunctions.length)
await Promise.allSettled( relaysForThisRound.map((relay) => fetchFromRelay(relay, filters, pubkey, events, relayStatus)) )
}
await processInPool(relays, (relay, poolStatus) => fetchFromRelay(relay, filters, pubkey, events, poolStatus), 10)
displayRelayStatus({})
// return data as an array of events
return Object.keys(events).map((id) => events[id])
}
const processInPool = async (items, processItem, poolSize) => {
let pool = {};
let poolStatus = {}
let remaining = [...items]
while (remaining.length) {
let processing = remaining.splice(0, 1)
let item = processing[0]
pool[item] = processItem(item, poolStatus);
if (Object.keys(pool).length > poolSize - 1) {
try {
const resolvedId = await Promise.race(Object.values(pool)); // wait for one Promise to finish
delete pool[resolvedId]; // remove that Promise from the pool
} catch (resolvedId) {
delete pool[resolvedId]; // remove that Promise from the pool
}
}
$('#fetching-progress').val(items.length - remaining.length)
}
await Promise.allSettled(Object.values(pool));
}
const sendAllEvents = async (relay, data, relayStatus, ws) => {
console.log("Sending:", data.length, " events")
for (evnt of data) {
@ -191,7 +294,7 @@ function hexToBytes(hex) {
try {
const ws = new WebSocket(relay)
updateRelayStatus(relay, "Starting", 0, relayStatus)
updateRelayStatus(relay, "Starting", 0, undefined, relayStatus)
// prevent hanging forever
let myTimeout = setTimeout(() => {
@ -201,7 +304,7 @@ function hexToBytes(hex) {
// fetch events from relay
ws.onopen = () => {
updateRelayStatus(relay, "Sending", 0, relayStatus)
updateRelayStatus(relay, "Sending", 0, undefined, relayStatus)
clearTimeout(myTimeout)
myTimeout = setTimeout(() => {
@ -224,28 +327,28 @@ function hexToBytes(hex) {
// end of subscription messages
if (msgType === 'OK') {
if (inserted == true) {
updateRelayStatus(relay, undefined, 1, relayStatus)
updateRelayStatus(relay, undefined, 1, undefined, relayStatus)
} else {
console.log(event.data)
console.log(relay, event.data)
}
} else {
console.log(event.data)
console.log(relay, event.data)
}
}
ws.onerror = (err) => {
updateRelayStatus(relay, "Error", 0, relayStatus)
updateRelayStatus(relay, "Error", 0, undefined, relayStatus)
console.log("Error", err)
ws.close()
reject(err)
}
ws.onclose = (socket, event) => {
updateRelayStatus(relay, "Done", 0, relayStatus)
updateRelayStatus(relay, "Done", 0, undefined, relayStatus)
console.log("OnClose", relayStatus)
resolve()
}
} catch (exception) {
console.log(exception)
updateRelayStatus(relay, "Error", 0, relayStatus)
updateRelayStatus(relay, "Error", 0, undefined, relayStatus)
try {
ws.close()
} catch (exception) {
@ -266,4 +369,38 @@ function hexToBytes(hex) {
}
displayRelayStatus(relayStatus)
}
async function signNostrAuthEvent(relay, auth_challenge) {
try {
if (!window.nostr) {
throw "Nostr extension not loaded or available"
}
let msg = {
kind: 22243, // NIP-42++
content: "",
tags: [
["relay", relay]
["challenge", auth_challenge]
],
};
// set msg fields
msg.created_at = Math.floor((new Date()).getTime() / 1000);
msg.pubkey = await window.nostr.getPublicKey();
// Generate event id
msg.id = await generateNostrEventId(msg);
// Sign event
signed_msg = await window.nostr.signEvent(msg);
} catch (e) {
console.log("Failed to sign message with browser extension", e);
return undefined;
}
return signed_msg;
}

View File

@ -5,10 +5,10 @@
const fixedRelays = [
'wss://atlas.nostr.land', // paid relay 15000 npub12262qa4uhw7u8gdwlgmntqtv7aye8vdcmvszkqwgs0zchel6mz7s6cgrkj
'wss://bitcoiner.social', // paid relay 1000 npub1dxs2pygtfxsah77yuncsmu3ttqr274qr5g5zva3c7t5s3jtgy2xszsn4st
'wss://brb.io',
'wss://eden.nostr.land', // paid relay 5000 npub16k7j4mwsqm8hakjl8x5ycrqmhx89lxkfwz2xxxcw75eav7sd8ztqy2rwdn
'wss://expensive-relay.fiatjaf.com',
'wss://freedom-relay.herokuapp.com',
'wss://relay.damus.io',
'wss://nos.lol',
'wss://a.nos.lol',
'wss://nostr-2.zebedee.cloud',
@ -30,6 +30,8 @@ const fixedRelays = [
'wss://nostr.rocks',
'wss://nostr.sandwich.farm',
'wss://nostr.wine', // paid relay 8888 npub18kzz4lkdtc5n729kvfunxuz287uvu9f64ywhjz43ra482t2y5sks0mx5sz
'wss://filter.nostr.wine',
'wss://inbox.nostr.wine',
'wss://nostr.zebedee.cloud',
'wss://private.red.gb.net', // paid relay 8888 npub1nctdevxxuvth3sx6r0gutv4tmvhwy9syvpkr3gfd5atz67fl97kqyjkuxk
'wss://puravida.nostr.land', // paid relay 10000 npub16k7j4mwsqm8hakjl8x5ycrqmhx89lxkfwz2xxxcw75eav7sd8ztqy2rwdn
@ -56,4 +58,4 @@ var relays = []
fetch("https://api.nostr.watch/v1/online")
.then(response => response.json())
.then(json => relays = fixedRelays.concat(json));
.then(json => relays = [... new Set(fixedRelays.concat(json))] );

View File

@ -8,6 +8,10 @@ body {
font-family: 'Red Hat Text', sans-serif;
}
.fullwidth {
width: 500px;
}
.container {
text-align: center;
color: var(--color);