From 1256a6b8b1e303f1dfbbef6ba93ccd95de7813ba Mon Sep 17 00:00:00 2001 From: Vitor Pamplona Date: Sun, 28 Jan 2024 12:43:23 -0500 Subject: [PATCH] Adds pool processing Changes display to table Fixes Max Limit in relays. Fixes bug when kind 3 is not downloaded Breaks into multiple subscriptions to use until Adds relay authentication --- index.html | 2 +- js/nostr-broadcast.js | 42 ++++---- js/nostr-utils.js | 239 +++++++++++++++++++++++++++++++++--------- js/relays.js | 6 +- style.css | 4 + 5 files changed, 218 insertions(+), 75 deletions(-) diff --git a/index.html b/index.html index 3c46c5b..cab8afe 100644 --- a/index.html +++ b/index.html @@ -145,7 +145,7 @@

-

+

diff --git a/js/nostr-broadcast.js b/js/nostr-broadcast.js index cbdd3e5..5fc59c0 100644 --- a/js/nostr-broadcast.js +++ b/js/nostr-broadcast.js @@ -34,7 +34,7 @@ const fetchAndBroadcast = async () => { $('#checking-relays-header').text("Waiting for Relays:") // get all events from relays - const filters =[{ authors: [pubkey] }, { "#p": [pubkey] }] + const filters =[{ authors: [pubkey] }, { "#p": [pubkey] }] const data = (await getEvents(filters, pubkey)).sort((a, b) => b.created_at - a.created_at) // inform user fetching is done @@ -42,25 +42,28 @@ const fetchAndBroadcast = async () => { $('#fetching-progress').val(relays.length) const latestKind3 = data.filter((it) => it.kind == 3 && it.pubkey === pubkey)[0] - const myRelaySet = JSON.parse(latestKind3.content) - relays = Object.keys(myRelaySet).filter(url => myRelaySet[url].write).map(url => url) - $('#checking-relays-header-box').css('display', 'none') - $('#checking-relays-box').css('display', 'none') - // inform user that backup file (js format) is being downloaded - $('#file-download').html(txt.download) - downloadFile(data, 'nostr-backup.js') - // inform user that app is broadcasting events to relays - $('#broadcasting-status').html(txt.broadcasting) - // show and update broadcasting progress bar - $('#broadcasting-progress').css('visibility', 'visible') - $('#broadcasting-progress').prop('max', relays.length) + if (latestKind3) { + const myRelaySet = JSON.parse(latestKind3.content) + relays = Object.keys(myRelaySet).filter(url => myRelaySet[url].write).map(url => url) - $('#checking-relays-header-box').css('display', 'flex') - $('#checking-relays-box').css('display', 'flex') - $('#checking-relays-header').text("Broadcasting to Relays:") - - await broadcastEvents(data) + $('#checking-relays-header-box').css('display', 'none') + $('#checking-relays-box').css('display', 'none') + // inform user that backup file (js format) is being downloaded + $('#file-download').html(txt.download) + downloadFile(data, 'nostr-backup.js') + // inform user that app is broadcasting events to relays + $('#broadcasting-status').html(txt.broadcasting) + // show and update broadcasting progress bar + $('#broadcasting-progress').css('visibility', 'visible') + $('#broadcasting-progress').prop('max', relays.length) + + $('#checking-relays-header-box').css('display', 'flex') + $('#checking-relays-box').css('display', 'flex') + $('#checking-relays-header').text("Broadcasting to Relays:") + + await broadcastEvents(data) + } // inform user that broadcasting is done $('#broadcasting-status').html(txt.broadcasting + checkMark) @@ -84,9 +87,6 @@ if (window.nostr) { $('#get-from-extension').css('display', '') } - - - // button click handler const justBroadcast = async (fileName) => { const reader = new FileReader(); diff --git a/js/nostr-utils.js b/js/nostr-utils.js index b65cf54..af6adbb 100644 --- a/js/nostr-utils.js +++ b/js/nostr-utils.js @@ -47,7 +47,7 @@ function hexToBytes(hex) { tempLink.click() } - const updateRelayStatus = (relay, status, addToCount, relayStatusAndCount) => { + const updateRelayStatus = (relay, status, addToCount, until, relayStatusAndCount) => { if (relayStatusAndCount[relay] == undefined) { relayStatusAndCount[relay] = {} } @@ -55,6 +55,8 @@ function hexToBytes(hex) { if (status) relayStatusAndCount[relay].status = status + relayStatusAndCount[relay].until = until + if (relayStatusAndCount[relay].count != undefined) relayStatusAndCount[relay].count = relayStatusAndCount[relay].count + addToCount else @@ -65,8 +67,17 @@ function hexToBytes(hex) { const displayRelayStatus = (relayStatusAndCount) => { if (Object.keys(relayStatusAndCount).length > 0) { - let newText = Object.keys(relayStatusAndCount).map( - it => it.replace("wss://", "").replace("ws://", "") + ": " + relayStatusAndCount[it].status + " (" + relayStatusAndCount[it].count + ")" + const newText = Object.keys(relayStatusAndCount).map( + it => { + let untilStr = ""; + + if (relayStatusAndCount[it].until) + untilStr = " <" + new Date(relayStatusAndCount[it].until * 1000).toLocaleDateString("en-US") + + const relayName = it.replace("wss://", "").replace("ws://", "") + const line = "" + relayName + "" + relayStatusAndCount[it].status + "" + untilStr + "" + relayStatusAndCount[it].count + "" + return "" +line+ "" + } ).join("
") $('#checking-relays').html(newText) } else { @@ -79,83 +90,154 @@ function hexToBytes(hex) { const fetchFromRelay = async (relay, filters, pubkey, events, relayStatus) => new Promise((resolve, reject) => { try { - updateRelayStatus(relay, "Starting", 0, relayStatus) + updateRelayStatus(relay, "Starting", 0, undefined, relayStatus) // open websocket const ws = new WebSocket(relay) // prevent hanging forever let myTimeout = setTimeout(() => { ws.close() - reject('timeout') + reject(relay) }, 10_000) + const subscriptions = Object.fromEntries(filters.map ( (filter, index) => { + let id = "my-sub-"+index - // subscription id - const subsId = 'my-sub' + return [ + id, { + id: id, + counter: 0, + lastEvent: null, + done: false, + filter: filter, + eventIds: new Set() + } + ] + })) + // subscribe to events filtered by author ws.onopen = () => { clearTimeout(myTimeout) myTimeout = setTimeout(() => { ws.close() - reject('timeout') + reject(relay) }, 10_000) - updateRelayStatus(relay, "Downloading", 0, relayStatus) - ws.send(JSON.stringify(['REQ', subsId].concat(filters))) + updateRelayStatus(relay, "Downloading", 0, undefined, relayStatus) + + for (const [key, sub] of Object.entries(subscriptions)) { + ws.send(JSON.stringify(['REQ', sub.id, sub.filter])) + } } // Listen for messages ws.onmessage = (event) => { const [msgType, subscriptionId, data] = JSON.parse(event.data) // event messages - if (msgType === 'EVENT' && subscriptionId === subsId) { + if (msgType === 'EVENT') { clearTimeout(myTimeout) myTimeout = setTimeout(() => { ws.close() - reject('timeout') + reject(relay) }, 10_000) - const { id } = data + try { + const { id } = data - // don't save/reboradcast kind 3s that are not from the author. - // their are too big. - if (data.kind == 3 && data.pubkey != pubkey) { + if (!subscriptions[subscriptionId].lastEvent || data.created_at < subscriptions[subscriptionId].lastEvent.created_at) + subscriptions[subscriptionId].lastEvent = data + + if (data.id in subscriptions[subscriptionId].eventIds) return + + subscriptions[subscriptionId].eventIds.add(data.id) + subscriptions[subscriptionId].counter++ + + // don't save/reboradcast kind 3s that are not from the author. + // their are too big. + if (data.kind == 3 && data.pubkey != pubkey) { + return + } + + let until = undefined + + if (subscriptions[subscriptionId].lastEvent) { + until = subscriptions[subscriptionId].lastEvent.created_at + } + + updateRelayStatus(relay, undefined, 1, until, relayStatus) + + // prevent duplicated events + if (events[id]) return + else events[id] = data + + // show how many events were found until this moment + $('#events-found').text(`${Object.keys(events).length} events found`) + } catch(err) { + console.log(err, event) return } - - updateRelayStatus(relay, undefined, 1, relayStatus) - - // prevent duplicated events - if (events[id]) return - else events[id] = data - - // show how many events were found until this moment - $('#events-found').text(`${Object.keys(events).length} events found`) } + // end of subscription messages - if (msgType === 'EOSE' && subscriptionId === subsId) { - updateRelayStatus(relay, "Done", 0, relayStatus) - ws.close() - resolve() + if (msgType === 'EOSE') { + // Restarting the filter is necessary to go around Max Limits for each relay. + if (subscriptions[subscriptionId].counter < 2) { + subscriptions[subscriptionId].done = true + console.log(relay, subscriptionId, event.data) + + let alldone = Object.values(subscriptions).every(filter => filter.done === true); + if (alldone) { + updateRelayStatus(relay, "Done", 0, undefined, relayStatus) + ws.close() + resolve(relay) + } + } else { + //console.log("Limit: ", { ...filters[0], until: lastSub1Event.created_at }) + subscriptions[subscriptionId].counter = 0 + ws.send(JSON.stringify(['REQ', subscriptions[subscriptionId].id, { ...subscriptions[subscriptionId].filter, until: subscriptions[subscriptionId].lastEvent.created_at } ])) + } + } + + if (msgType === 'AUTH') { + signNostrAuthEvent(relay, subscriptionId).then( + (event) => { + if (event) + ws.send(JSON.stringify(['EVENT', event])) + else { + updateRelayStatus(relay, "AUTH Req", 0, undefined, relayStatus) + ws.close() + reject(relay) + } + }, + (reason) => { + updateRelayStatus(relay, "AUTH Req", 0, undefined, relayStatus) + ws.close() + reject(relay) + }, + ) } } ws.onerror = (err) => { - updateRelayStatus(relay, "Done", 0, relayStatus) - ws.close() - reject(err) + updateRelayStatus(relay, "Done", 0, undefined, relayStatus) + try { + ws.close() + reject(relay) + } catch { + reject(relay) + } } ws.onclose = (socket, event) => { - updateRelayStatus(relay, "Done", 0, relayStatus) - resolve() + updateRelayStatus(relay, "Done", 0, undefined, relayStatus) + resolve(relay) } } catch (exception) { console.log(exception) - updateRelayStatus(relay, "Error", 0, relayStatus) + updateRelayStatus(relay, "Error", 0, undefined, relayStatus) try { ws.close() } catch (exception) { } - reject(exception) + reject(relay) } }) @@ -165,19 +247,40 @@ function hexToBytes(hex) { const events = {} // batch processing of 10 relays - let fetchFunctions = [...relays] - while (fetchFunctions.length) { - let relaysForThisRound = fetchFunctions.splice(0, 10) - let relayStatus = {} - $('#fetching-progress').val(relays.length - fetchFunctions.length) - await Promise.allSettled( relaysForThisRound.map((relay) => fetchFromRelay(relay, filters, pubkey, events, relayStatus)) ) - } + await processInPool(relays, (relay, poolStatus) => fetchFromRelay(relay, filters, pubkey, events, poolStatus), 10) + displayRelayStatus({}) // return data as an array of events return Object.keys(events).map((id) => events[id]) } + const processInPool = async (items, processItem, poolSize) => { + let pool = {}; + let poolStatus = {} + let remaining = [...items] + + while (remaining.length) { + let processing = remaining.splice(0, 1) + let item = processing[0] + pool[item] = processItem(item, poolStatus); + + if (Object.keys(pool).length > poolSize - 1) { + try { + const resolvedId = await Promise.race(Object.values(pool)); // wait for one Promise to finish + + delete pool[resolvedId]; // remove that Promise from the pool + } catch (resolvedId) { + delete pool[resolvedId]; // remove that Promise from the pool + } + } + + $('#fetching-progress').val(items.length - remaining.length) + } + + await Promise.allSettled(Object.values(pool)); +} + const sendAllEvents = async (relay, data, relayStatus, ws) => { console.log("Sending:", data.length, " events") for (evnt of data) { @@ -191,7 +294,7 @@ function hexToBytes(hex) { try { const ws = new WebSocket(relay) - updateRelayStatus(relay, "Starting", 0, relayStatus) + updateRelayStatus(relay, "Starting", 0, undefined, relayStatus) // prevent hanging forever let myTimeout = setTimeout(() => { @@ -201,7 +304,7 @@ function hexToBytes(hex) { // fetch events from relay ws.onopen = () => { - updateRelayStatus(relay, "Sending", 0, relayStatus) + updateRelayStatus(relay, "Sending", 0, undefined, relayStatus) clearTimeout(myTimeout) myTimeout = setTimeout(() => { @@ -224,28 +327,28 @@ function hexToBytes(hex) { // end of subscription messages if (msgType === 'OK') { if (inserted == true) { - updateRelayStatus(relay, undefined, 1, relayStatus) + updateRelayStatus(relay, undefined, 1, undefined, relayStatus) } else { - console.log(event.data) + console.log(relay, event.data) } } else { - console.log(event.data) + console.log(relay, event.data) } } ws.onerror = (err) => { - updateRelayStatus(relay, "Error", 0, relayStatus) + updateRelayStatus(relay, "Error", 0, undefined, relayStatus) console.log("Error", err) ws.close() reject(err) } ws.onclose = (socket, event) => { - updateRelayStatus(relay, "Done", 0, relayStatus) + updateRelayStatus(relay, "Done", 0, undefined, relayStatus) console.log("OnClose", relayStatus) resolve() } } catch (exception) { console.log(exception) - updateRelayStatus(relay, "Error", 0, relayStatus) + updateRelayStatus(relay, "Error", 0, undefined, relayStatus) try { ws.close() } catch (exception) { @@ -266,4 +369,38 @@ function hexToBytes(hex) { } displayRelayStatus(relayStatus) + } + + async function signNostrAuthEvent(relay, auth_challenge) { + try { + + if (!window.nostr) { + throw "Nostr extension not loaded or available" + } + + let msg = { + kind: 22243, // NIP-42++ + content: "", + tags: [ + ["relay", relay] + ["challenge", auth_challenge] + ], + }; + + // set msg fields + msg.created_at = Math.floor((new Date()).getTime() / 1000); + msg.pubkey = await window.nostr.getPublicKey(); + + // Generate event id + msg.id = await generateNostrEventId(msg); + + // Sign event + signed_msg = await window.nostr.signEvent(msg); + + } catch (e) { + console.log("Failed to sign message with browser extension", e); + return undefined; + } + + return signed_msg; } \ No newline at end of file diff --git a/js/relays.js b/js/relays.js index ddbed0c..d08c9f6 100644 --- a/js/relays.js +++ b/js/relays.js @@ -5,10 +5,10 @@ const fixedRelays = [ 'wss://atlas.nostr.land', // paid relay 15000 npub12262qa4uhw7u8gdwlgmntqtv7aye8vdcmvszkqwgs0zchel6mz7s6cgrkj 'wss://bitcoiner.social', // paid relay 1000 npub1dxs2pygtfxsah77yuncsmu3ttqr274qr5g5zva3c7t5s3jtgy2xszsn4st - 'wss://brb.io', 'wss://eden.nostr.land', // paid relay 5000 npub16k7j4mwsqm8hakjl8x5ycrqmhx89lxkfwz2xxxcw75eav7sd8ztqy2rwdn 'wss://expensive-relay.fiatjaf.com', 'wss://freedom-relay.herokuapp.com', + 'wss://relay.damus.io', 'wss://nos.lol', 'wss://a.nos.lol', 'wss://nostr-2.zebedee.cloud', @@ -30,6 +30,8 @@ const fixedRelays = [ 'wss://nostr.rocks', 'wss://nostr.sandwich.farm', 'wss://nostr.wine', // paid relay 8888 npub18kzz4lkdtc5n729kvfunxuz287uvu9f64ywhjz43ra482t2y5sks0mx5sz + 'wss://filter.nostr.wine', + 'wss://inbox.nostr.wine', 'wss://nostr.zebedee.cloud', 'wss://private.red.gb.net', // paid relay 8888 npub1nctdevxxuvth3sx6r0gutv4tmvhwy9syvpkr3gfd5atz67fl97kqyjkuxk 'wss://puravida.nostr.land', // paid relay 10000 npub16k7j4mwsqm8hakjl8x5ycrqmhx89lxkfwz2xxxcw75eav7sd8ztqy2rwdn @@ -56,4 +58,4 @@ var relays = [] fetch("https://api.nostr.watch/v1/online") .then(response => response.json()) - .then(json => relays = fixedRelays.concat(json)); + .then(json => relays = [... new Set(fixedRelays.concat(json))] ); diff --git a/style.css b/style.css index e0b68df..daf050b 100644 --- a/style.css +++ b/style.css @@ -8,6 +8,10 @@ body { font-family: 'Red Hat Text', sans-serif; } +.fullwidth { + width: 500px; +} + .container { text-align: center; color: var(--color);