Compare commits

..

12 Commits

Author SHA1 Message Date
Barry Deen
24b51de9fd
Merge pull request #90 from bitvora/dev-limitQuery
filter ip rate limiter
2025-06-16 14:24:41 -04:00
Barry Deen
0242d65d5e filter ip rate limiter 2025-06-16 14:14:53 -04:00
Barry Deen
1714a801ca
Merge pull request #89 from bitvora/dev-memoryLeak
Dev memory leak
2025-06-15 23:22:24 -04:00
Barry Deen
a07829c492 remove initial huge wave of notes 2025-06-15 23:04:18 -04:00
Barry Deen
f2efc9199b reduce concurrent routines 2025-06-15 22:56:01 -04:00
Barry Deen
208c524875
Merge pull request #88 from bitvora/dev-memoryLeak
batching, goroutine limits
2025-06-15 22:01:12 -04:00
Barry Deen
1f4143ee5f change default to 40k 2025-06-15 21:53:42 -04:00
Barry Deen
1f9c1dccf5 batching, goroutine limits 2025-06-15 21:42:58 -04:00
Barry Deen
df4d8e5e7c
Merge pull request #87 from sudocarlos/patch-2
Add wot.sudocarlos.com
2025-05-27 20:17:47 -04:00
sudocarlos
9722c85f41
Add wot.sudocarlos.com
Now running on capable hardware
2025-04-26 15:40:20 -04:00
Barry Deen
929e6f8318
Merge pull request #82 from bitvora/dev-ignoreList
ignore follow list for people who follow spammers
2025-01-04 16:09:18 -05:00
Barry Deen
9a74359210 ignore follow list for people who follow spammers 2025-01-04 15:56:24 -05:00
3 changed files with 487 additions and 70 deletions

View File

@ -25,3 +25,6 @@ ARCHIVE_REACTIONS="FALSE" # optional, reactions take up a lot of space and compu
# optional, certain note kinds older than this many days will be deleted # optional, certain note kinds older than this many days will be deleted
MAX_AGE_DAYS=365 MAX_AGE_DAYS=365
# comma delimited list of pubkeys who follow bots and ruin the WoT
IGNORE_FOLLOWS_LIST=""

View File

@ -24,6 +24,7 @@ Don't want to run the relay, just want to connect to some? Here are some availab
- [wss://wot.tealeaf.dev](https://wot.tealeaf.dev) - [wss://wot.tealeaf.dev](https://wot.tealeaf.dev)
- [wss://wot.nostr.net](https://wot.nostr.net) - [wss://wot.nostr.net](https://wot.nostr.net)
- [wss://relay.goodmorningbitcoin.com](https://relay.goodmorningbitcoin.com) - [wss://relay.goodmorningbitcoin.com](https://relay.goodmorningbitcoin.com)
- [wss://wot.sudocarlos.com](wss://wot.sudocarlos.com)
## Prerequisites ## Prerequisites
@ -64,6 +65,7 @@ REFRESH_INTERVAL_HOURS=24 # interval in hours to refresh the web of trust
MINIMUM_FOLLOWERS=3 #how many followers before they're allowed in the WoT MINIMUM_FOLLOWERS=3 #how many followers before they're allowed in the WoT
ARCHIVAL_SYNC="FALSE" # set to TRUE to archive every note from every person in the WoT (not recommended) ARCHIVAL_SYNC="FALSE" # set to TRUE to archive every note from every person in the WoT (not recommended)
ARCHIVE_REACTIONS="FALSE" # set to TRUE to archive every reaction from every person in the WoT (not recommended) ARCHIVE_REACTIONS="FALSE" # set to TRUE to archive every reaction from every person in the WoT (not recommended)
IGNORE_FOLLOWS_LIST="" # comma separated list of pubkeys who follow too many bots and ruin the WoT
``` ```
### 4. Build the project ### 4. Build the project

534
main.go
View File

@ -7,8 +7,13 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
_ "net/http/pprof"
"os" "os"
"runtime"
"strconv" "strconv"
"strings"
"sync"
"sync/atomic"
"time" "time"
"github.com/fiatjaf/eventstore" "github.com/fiatjaf/eventstore"
@ -38,20 +43,45 @@ type Config struct {
RelayIcon string RelayIcon string
MaxAgeDays int MaxAgeDays int
ArchiveReactions bool ArchiveReactions bool
IgnoredPubkeys []string
MaxTrustNetwork int
MaxRelays int
MaxOneHopNetwork int
} }
var pool *nostr.SimplePool var pool *nostr.SimplePool
var wdb nostr.RelayStore var wdb nostr.RelayStore
var relays []string var relays []string
var relaySet = make(map[string]bool) // O(1) lookup
var config Config var config Config
var trustNetwork []string var trustNetwork []string
var trustNetworkSet = make(map[string]bool) // O(1) lookup
var seedRelays []string var seedRelays []string
var booted bool var booted bool
var oneHopNetwork []string var oneHopNetwork []string
var oneHopNetworkSet = make(map[string]bool) // O(1) lookup
var trustNetworkMap map[string]bool var trustNetworkMap map[string]bool
var pubkeyFollowerCount = make(map[string]int) var pubkeyFollowerCount = make(map[string]int)
var trustedNotes uint64 var trustedNotes uint64
var untrustedNotes uint64 var untrustedNotes uint64
var archiveEventSemaphore = make(chan struct{}, 20) // Reduced from 100 to 20
// Performance counters
var (
totalEvents uint64
rejectedEvents uint64
archivedEvents uint64
profileRefreshCount uint64
networkRefreshCount uint64
)
// Mutexes for thread safety
var (
relayMutex sync.RWMutex
trustNetworkMutex sync.RWMutex
oneHopMutex sync.RWMutex
followerMutex sync.RWMutex
)
func main() { func main() {
nostr.InfoLogger = log.New(io.Discard, "", 0) nostr.InfoLogger = log.New(io.Discard, "", 0)
@ -104,6 +134,7 @@ func main() {
relay.RejectFilter = append(relay.RejectFilter, relay.RejectFilter = append(relay.RejectFilter,
policies.NoEmptyFilters, policies.NoEmptyFilters,
policies.NoComplexFilters, policies.NoComplexFilters,
policies.FilterIPRateLimiter(5, time.Minute*1, 30),
) )
relay.RejectConnection = append(relay.RejectConnection, relay.RejectConnection = append(relay.RejectConnection,
@ -114,10 +145,29 @@ func main() {
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) { relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) {
if !trustNetworkMap[event.PubKey] { atomic.AddUint64(&totalEvents, 1)
return true, "we are rebuilding the trust network, please try again later"
// Don't reject events if we haven't booted yet or if trust network is empty
if !booted {
return false, ""
}
trustNetworkMutex.RLock()
trusted := trustNetworkMap[event.PubKey]
hasNetwork := len(trustNetworkMap) > 0
trustNetworkMutex.RUnlock()
// If we don't have a trust network yet, allow all events
if !hasNetwork {
return false, ""
}
if !trusted {
atomic.AddUint64(&rejectedEvents, 1)
return true, "not in web of trust"
} }
if event.Kind == nostr.KindEncryptedDirectMessage { if event.Kind == nostr.KindEncryptedDirectMessage {
atomic.AddUint64(&rejectedEvents, 1)
return true, "only gift wrapped DMs are allowed" return true, "only gift wrapped DMs are allowed"
} }
@ -142,6 +192,8 @@ func main() {
} }
go refreshTrustNetwork(ctx, relay) go refreshTrustNetwork(ctx, relay)
go monitorMemoryUsage() // Add memory monitoring
go monitorPerformance() // Add performance monitoring
mux := relay.Router() mux := relay.Router()
static := http.FileServer(http.Dir(config.StaticPath)) static := http.FileServer(http.Dir(config.StaticPath))
@ -149,6 +201,10 @@ func main() {
mux.Handle("GET /static/", http.StripPrefix("/static/", static)) mux.Handle("GET /static/", http.StripPrefix("/static/", static))
mux.Handle("GET /favicon.ico", http.StripPrefix("/", static)) mux.Handle("GET /favicon.ico", http.StripPrefix("/", static))
// Add debug endpoints
mux.HandleFunc("GET /debug/stats", debugStatsHandler)
mux.HandleFunc("GET /debug/goroutines", debugGoroutinesHandler)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
tmpl := template.Must(template.ParseFiles(os.Getenv("INDEX_PATH"))) tmpl := template.Must(template.ParseFiles(os.Getenv("INDEX_PATH")))
data := struct { data := struct {
@ -169,6 +225,10 @@ func main() {
}) })
log.Println("🎉 relay running on port :3334") log.Println("🎉 relay running on port :3334")
log.Println("🔍 debug endpoints available at:")
log.Println(" http://localhost:3334/debug/pprof/ (CPU/memory profiling)")
log.Println(" http://localhost:3334/debug/stats (application stats)")
log.Println(" http://localhost:3334/debug/goroutines (goroutine info)")
err := http.ListenAndServe(":3334", relay) err := http.ListenAndServe(":3334", relay)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -209,8 +269,28 @@ func LoadConfig() Config {
os.Setenv("ARCHIVE_REACTIONS", "FALSE") os.Setenv("ARCHIVE_REACTIONS", "FALSE")
} }
if os.Getenv("MAX_TRUST_NETWORK") == "" {
os.Setenv("MAX_TRUST_NETWORK", "40000")
}
if os.Getenv("MAX_RELAYS") == "" {
os.Setenv("MAX_RELAYS", "1000")
}
if os.Getenv("MAX_ONE_HOP_NETWORK") == "" {
os.Setenv("MAX_ONE_HOP_NETWORK", "50000")
}
ignoredPubkeys := []string{}
if ignoreList := os.Getenv("IGNORE_FOLLOWS_LIST"); ignoreList != "" {
ignoredPubkeys = splitAndTrim(ignoreList)
}
minimumFollowers, _ := strconv.Atoi(os.Getenv("MINIMUM_FOLLOWERS")) minimumFollowers, _ := strconv.Atoi(os.Getenv("MINIMUM_FOLLOWERS"))
maxAgeDays, _ := strconv.Atoi(os.Getenv("MAX_AGE_DAYS")) maxAgeDays, _ := strconv.Atoi(os.Getenv("MAX_AGE_DAYS"))
maxTrustNetwork, _ := strconv.Atoi(os.Getenv("MAX_TRUST_NETWORK"))
maxRelays, _ := strconv.Atoi(os.Getenv("MAX_RELAYS"))
maxOneHopNetwork, _ := strconv.Atoi(os.Getenv("MAX_ONE_HOP_NETWORK"))
config := Config{ config := Config{
RelayName: getEnv("RELAY_NAME"), RelayName: getEnv("RELAY_NAME"),
@ -227,6 +307,10 @@ func LoadConfig() Config {
ArchivalSync: getEnv("ARCHIVAL_SYNC") == "TRUE", ArchivalSync: getEnv("ARCHIVAL_SYNC") == "TRUE",
MaxAgeDays: maxAgeDays, MaxAgeDays: maxAgeDays,
ArchiveReactions: getEnv("ARCHIVE_REACTIONS") == "TRUE", ArchiveReactions: getEnv("ARCHIVE_REACTIONS") == "TRUE",
IgnoredPubkeys: ignoredPubkeys,
MaxTrustNetwork: maxTrustNetwork,
MaxRelays: maxRelays,
MaxOneHopNetwork: maxOneHopNetwork,
} }
return config return config
@ -241,44 +325,101 @@ func getEnv(key string) string {
} }
func updateTrustNetworkFilter() { func updateTrustNetworkFilter() {
trustNetworkMap = make(map[string]bool) // Build new trust network in temporary variables
newTrustNetworkMap := make(map[string]bool)
var newTrustNetwork []string
newTrustNetworkSet := make(map[string]bool)
log.Println("🌐 updating trust network map") log.Println("🌐 building new trust network map")
followerMutex.RLock()
for pubkey, count := range pubkeyFollowerCount { for pubkey, count := range pubkeyFollowerCount {
if count >= config.MinimumFollowers { if count >= config.MinimumFollowers {
trustNetworkMap[pubkey] = true newTrustNetworkMap[pubkey] = true
appendPubkey(pubkey) if !newTrustNetworkSet[pubkey] && len(pubkey) == 64 && len(newTrustNetwork) < config.MaxTrustNetwork {
newTrustNetwork = append(newTrustNetwork, pubkey)
newTrustNetworkSet[pubkey] = true
} }
} }
}
followerMutex.RUnlock()
log.Println("🌐 trust network map updated with", len(trustNetwork), "keys") // Now atomically replace the active trust network
trustNetworkMutex.Lock()
trustNetworkMap = newTrustNetworkMap
trustNetwork = newTrustNetwork
trustNetworkSet = newTrustNetworkSet
trustNetworkMutex.Unlock()
log.Println("🌐 trust network map updated with", len(newTrustNetwork), "keys")
// Cleanup follower count map periodically to prevent unbounded growth
followerMutex.Lock()
if len(pubkeyFollowerCount) > config.MaxOneHopNetwork*2 {
log.Println("🧹 cleaning follower count map")
newFollowerCount := make(map[string]int)
for pubkey, count := range pubkeyFollowerCount {
if count >= config.MinimumFollowers || newTrustNetworkMap[pubkey] {
newFollowerCount[pubkey] = count
}
}
oldCount := len(pubkeyFollowerCount)
pubkeyFollowerCount = newFollowerCount
log.Printf("🧹 cleaned follower count map: %d -> %d entries", oldCount, len(newFollowerCount))
}
followerMutex.Unlock()
} }
func refreshProfiles(ctx context.Context) { func refreshProfiles(ctx context.Context) {
for i := 0; i < len(trustNetwork); i += 200 { atomic.AddUint64(&profileRefreshCount, 1)
start := time.Now()
// Get a snapshot of current trust network to avoid holding locks during network operations
trustNetworkMutex.RLock()
currentTrustNetwork := make([]string, len(trustNetwork))
copy(currentTrustNetwork, trustNetwork)
trustNetworkMutex.RUnlock()
for i := 0; i < len(currentTrustNetwork); i += 200 {
timeout, cancel := context.WithTimeout(ctx, 4*time.Second) timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
end := i + 200 end := i + 200
if end > len(trustNetwork) { if end > len(currentTrustNetwork) {
end = len(trustNetwork) end = len(currentTrustNetwork)
} }
filters := []nostr.Filter{{ filters := []nostr.Filter{{
Authors: trustNetwork[i:end], Authors: currentTrustNetwork[i:end],
Kinds: []int{nostr.KindProfileMetadata}, Kinds: []int{nostr.KindProfileMetadata},
}} }}
for ev := range pool.SubManyEose(timeout, seedRelays, filters) { for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
wdb.Publish(ctx, *ev.Event) wdb.Publish(ctx, *ev.Event)
} }
cancel() // Cancel after each iteration
} }
log.Println("👤 profiles refreshed: ", len(trustNetwork)) duration := time.Since(start)
log.Printf("👤 profiles refreshed: %d profiles in %v", len(currentTrustNetwork), duration)
} }
func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
runTrustNetworkRefresh := func() { runTrustNetworkRefresh := func() {
atomic.AddUint64(&networkRefreshCount, 1)
start := time.Now()
// Build new networks in temporary variables to avoid disrupting the active network
var newOneHopNetwork []string
newOneHopNetworkSet := make(map[string]bool)
newPubkeyFollowerCount := make(map[string]int)
// Copy existing follower counts to preserve data
followerMutex.RLock()
for k, v := range pubkeyFollowerCount {
newPubkeyFollowerCount[k] = v
}
followerMutex.RUnlock()
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel() defer cancel()
@ -288,32 +429,48 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
}} }}
log.Println("🔍 fetching owner's follows") log.Println("🔍 fetching owner's follows")
eventCount := 0
for ev := range pool.SubManyEose(timeoutCtx, seedRelays, filters) { for ev := range pool.SubManyEose(timeoutCtx, seedRelays, filters) {
eventCount++
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) { for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
pubkeyFollowerCount[contact[1]]++ // Increment follower count for the pubkey pubkey := contact[1]
appendOneHopNetwork(contact[1]) if isIgnored(pubkey, config.IgnoredPubkeys) {
fmt.Println("ignoring follows from pubkey: ", pubkey)
continue
}
newPubkeyFollowerCount[contact[1]]++
// Add to new one-hop network
if !newOneHopNetworkSet[contact[1]] && len(contact[1]) == 64 && len(newOneHopNetwork) < config.MaxOneHopNetwork {
newOneHopNetwork = append(newOneHopNetwork, contact[1])
newOneHopNetworkSet[contact[1]] = true
} }
} }
}
log.Printf("🔍 processed %d follow list events", eventCount)
log.Println("🌐 building web of trust graph") log.Println("🌐 building web of trust graph")
for i := 0; i < len(oneHopNetwork); i += 100 { totalProcessed := 0
for i := 0; i < len(newOneHopNetwork); i += 100 {
timeout, cancel := context.WithTimeout(ctx, 4*time.Second) timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
end := i + 100 end := i + 100
if end > len(oneHopNetwork) { if end > len(newOneHopNetwork) {
end = len(oneHopNetwork) end = len(newOneHopNetwork)
} }
filters = []nostr.Filter{{ filters = []nostr.Filter{{
Authors: oneHopNetwork[i:end], Authors: newOneHopNetwork[i:end],
Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata}, Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata},
}} }}
batchCount := 0
for ev := range pool.SubManyEose(timeout, seedRelays, filters) { for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
batchCount++
totalProcessed++
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) { for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
if len(contact) > 1 { if len(contact) > 1 {
pubkeyFollowerCount[contact[1]]++ // Increment follower count for the pubkey newPubkeyFollowerCount[contact[1]]++
} }
} }
@ -325,34 +482,87 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
wdb.Publish(ctx, *ev.Event) wdb.Publish(ctx, *ev.Event)
} }
} }
cancel() // Cancel after each iteration
if i%500 == 0 { // Log progress every 5 batches
log.Printf("🌐 processed batch %d-%d (%d events in this batch)", i, end, batchCount)
} }
log.Println("🫂 total network size:", len(pubkeyFollowerCount))
log.Println("🔗 relays discovered:", len(relays))
} }
// Now atomically replace the active data structures
oneHopMutex.Lock()
oneHopNetwork = newOneHopNetwork
oneHopNetworkSet = newOneHopNetworkSet
oneHopMutex.Unlock()
followerMutex.Lock()
pubkeyFollowerCount = newPubkeyFollowerCount
followerMutex.Unlock()
duration := time.Since(start)
log.Printf("🫂 total network size: %d (processed %d events in %v)", len(newPubkeyFollowerCount), totalProcessed, duration)
relayMutex.RLock()
log.Println("🔗 relays discovered:", len(relays))
relayMutex.RUnlock()
}
ticker := time.NewTicker(time.Duration(config.RefreshInterval) * time.Hour)
defer ticker.Stop()
// Run initial refresh
log.Println("🚀 performing initial trust network build...")
runTrustNetworkRefresh()
updateTrustNetworkFilter()
// Mark as booted after initial trust network is built
booted = true
log.Println("✅ trust network initialized, relay is now active")
deleteOldNotes(relay)
archiveTrustedNotes(ctx, relay)
// Then run on timer
for { for {
select {
case <-ticker.C:
log.Println("🔄 refreshing trust network in background...")
runTrustNetworkRefresh() runTrustNetworkRefresh()
updateTrustNetworkFilter() updateTrustNetworkFilter()
deleteOldNotes(relay) deleteOldNotes(relay)
archiveTrustedNotes(ctx, relay) archiveTrustedNotes(ctx, relay)
log.Println("✅ trust network refresh completed")
case <-ctx.Done():
return
}
} }
} }
func appendRelay(relay string) { func appendRelay(relay string) {
relayMutex.Lock()
defer relayMutex.Unlock()
for _, r := range relays { if len(relays) >= config.MaxRelays {
if r == relay { return // Prevent unbounded growth
return
} }
if relaySet[relay] {
return // Already exists
} }
relays = append(relays, relay) relays = append(relays, relay)
relaySet[relay] = true
} }
func appendPubkey(pubkey string) { func appendPubkey(pubkey string) {
for _, pk := range trustNetwork { trustNetworkMutex.Lock()
if pk == pubkey { defer trustNetworkMutex.Unlock()
return
if len(trustNetwork) >= config.MaxTrustNetwork {
return // Prevent unbounded growth
} }
if trustNetworkSet[pubkey] {
return // Already exists
} }
if len(pubkey) != 64 { if len(pubkey) != 64 {
@ -360,20 +570,7 @@ func appendPubkey(pubkey string) {
} }
trustNetwork = append(trustNetwork, pubkey) trustNetwork = append(trustNetwork, pubkey)
} trustNetworkSet[pubkey] = true
func appendOneHopNetwork(pubkey string) {
for _, pk := range oneHopNetwork {
if pk == pubkey {
return
}
}
if len(pubkey) != 64 {
return
}
oneHopNetwork = append(oneHopNetwork, pubkey)
} }
func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
@ -383,12 +580,13 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done)
if config.ArchivalSync { if config.ArchivalSync {
go refreshProfiles(ctx) go refreshProfiles(ctx)
var filters []nostr.Filter var filters []nostr.Filter
since := nostr.Now()
if config.ArchiveReactions { if config.ArchiveReactions {
filters = []nostr.Filter{{ filters = []nostr.Filter{{
Kinds: []int{ Kinds: []int{
nostr.KindArticle, nostr.KindArticle,
@ -403,6 +601,7 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
nostr.KindZap, nostr.KindZap,
nostr.KindTextNote, nostr.KindTextNote,
}, },
Since: &since,
}} }}
} else { } else {
filters = []nostr.Filter{{ filters = []nostr.Filter{{
@ -418,24 +617,54 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
nostr.KindZap, nostr.KindZap,
nostr.KindTextNote, nostr.KindTextNote,
}, },
Since: &since,
}} }}
} }
log.Println("📦 archiving trusted notes...") log.Println("📦 archiving trusted notes...")
eventCount := 0
for ev := range pool.SubMany(timeout, seedRelays, filters) { for ev := range pool.SubMany(timeout, seedRelays, filters) {
go archiveEvent(ctx, relay, *ev.Event) eventCount++
// Check GC pressure every 1000 events
if eventCount%1000 == 0 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
if m.NumGC > 0 && eventCount > 1000 {
// If we're doing more than 2 GCs per 1000 events, slow down
gcRate := float64(m.NumGC) / float64(eventCount/1000)
if gcRate > 2.0 {
log.Printf("⚠️ High GC pressure (%.1f GC/1000 events), slowing archive process", gcRate)
time.Sleep(100 * time.Millisecond) // Brief pause
}
}
} }
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes") // Use semaphore to limit concurrent goroutines
select {
case archiveEventSemaphore <- struct{}{}:
go func(event nostr.Event) {
defer func() { <-archiveEventSemaphore }()
archiveEvent(ctx, relay, event)
}(*ev.Event)
case <-timeout.Done():
log.Printf("📦 archive timeout reached, processed %d events", eventCount)
return
default:
// If semaphore is full, process synchronously to avoid buildup
archiveEvent(ctx, relay, *ev.Event)
}
}
log.Printf("📦 archived %d trusted notes and discarded %d untrusted notes (processed %d total events)",
atomic.LoadUint64(&trustedNotes), atomic.LoadUint64(&untrustedNotes), eventCount)
} else { } else {
log.Println("🔄 web of trust will refresh in", config.RefreshInterval, "hours") log.Println("🔄 web of trust will refresh in", config.RefreshInterval, "hours")
select { select {
case <-timeout.Done(): case <-timeout.Done():
} }
} }
close(done)
}() }()
select { select {
@ -447,12 +676,17 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
} }
func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) { func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) {
if trustNetworkMap[ev.PubKey] { trustNetworkMutex.RLock()
trusted := trustNetworkMap[ev.PubKey]
trustNetworkMutex.RUnlock()
if trusted {
wdb.Publish(ctx, ev) wdb.Publish(ctx, ev)
relay.BroadcastEvent(&ev) relay.BroadcastEvent(&ev)
trustedNotes++ atomic.AddUint64(&trustedNotes, 1)
atomic.AddUint64(&archivedEvents, 1)
} else { } else {
untrustedNotes++ atomic.AddUint64(&untrustedNotes, 1)
} }
} }
@ -486,6 +720,7 @@ func deleteOldNotes(relay *khatru.Relay) error {
nostr.KindZap, nostr.KindZap,
nostr.KindTextNote, nostr.KindTextNote,
}, },
Limit: 1000, // Process in batches to avoid memory issues
} }
ch, err := relay.QueryEvents[0](ctx, filter) ch, err := relay.QueryEvents[0](ctx, filter)
@ -494,27 +729,47 @@ func deleteOldNotes(relay *khatru.Relay) error {
return err return err
} }
events := make([]*nostr.Event, 0) // Process events in batches to avoid memory issues
batchSize := 100
events := make([]*nostr.Event, 0, batchSize)
count := 0
for evt := range ch { for evt := range ch {
events = append(events, evt) events = append(events, evt)
} count++
if len(events) < 1 {
log.Println("0 old notes found")
return nil
}
if len(events) >= batchSize {
// Delete this batch
for num_evt, del_evt := range events { for num_evt, del_evt := range events {
for _, del := range relay.DeleteEvent { for _, del := range relay.DeleteEvent {
if err := del(ctx, del_evt); err != nil { if err := del(ctx, del_evt); err != nil {
log.Printf("error deleting note %d of %d. event id: %s", num_evt, len(events), del_evt.ID) log.Printf("error deleting note %d of batch. event id: %s", num_evt, del_evt.ID)
return err return err
} }
} }
} }
events = events[:0] // Reset slice but keep capacity
}
}
// Delete remaining events
if len(events) > 0 {
for num_evt, del_evt := range events {
for _, del := range relay.DeleteEvent {
if err := del(ctx, del_evt); err != nil {
log.Printf("error deleting note %d of final batch. event id: %s", num_evt, del_evt.ID)
return err
}
}
}
}
if count == 0 {
log.Println("0 old notes found")
} else {
log.Printf("%d old (until %d) notes deleted", count, oldAge)
}
log.Printf("%d old (until %d) notes deleted", len(events), oldAge)
return nil return nil
} }
@ -523,3 +778,160 @@ func getDB() badger.BadgerBackend {
Path: getEnv("DB_PATH"), Path: getEnv("DB_PATH"),
} }
} }
func splitAndTrim(input string) []string {
items := strings.Split(input, ",")
for i, item := range items {
items[i] = strings.TrimSpace(item)
}
return items
}
func isIgnored(pubkey string, ignoredPubkeys []string) bool {
for _, ignored := range ignoredPubkeys {
if pubkey == ignored {
return true
}
}
return false
}
// Add memory monitoring
func monitorMemoryUsage() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
relayMutex.RLock()
relayCount := len(relays)
relayMutex.RUnlock()
trustNetworkMutex.RLock()
trustNetworkCount := len(trustNetwork)
trustNetworkMutex.RUnlock()
oneHopMutex.RLock()
oneHopCount := len(oneHopNetwork)
oneHopMutex.RUnlock()
followerMutex.RLock()
followerCount := len(pubkeyFollowerCount)
followerMutex.RUnlock()
log.Printf("📊 Memory: Alloc=%d KB, Sys=%d KB, NumGC=%d",
m.Alloc/1024, m.Sys/1024, m.NumGC)
log.Printf("📊 Data structures: Relays=%d, TrustNetwork=%d, OneHop=%d, Followers=%d",
relayCount, trustNetworkCount, oneHopCount, followerCount)
}
}
}
// Add performance monitoring
func monitorPerformance() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
var lastGC uint32
var lastEvents, lastRejected, lastArchived uint64
for {
select {
case <-ticker.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
currentEvents := atomic.LoadUint64(&totalEvents)
currentRejected := atomic.LoadUint64(&rejectedEvents)
currentArchived := atomic.LoadUint64(&archivedEvents)
eventsPerMin := currentEvents - lastEvents
rejectedPerMin := currentRejected - lastRejected
archivedPerMin := currentArchived - lastArchived
gcPerMin := m.NumGC - lastGC
numGoroutines := runtime.NumGoroutine()
log.Printf("⚡ Performance: Events/min=%d, Rejected/min=%d, Archived/min=%d, GC/min=%d, Goroutines=%d",
eventsPerMin, rejectedPerMin, archivedPerMin, gcPerMin, numGoroutines)
if gcPerMin > 60 {
log.Printf("⚠️ HIGH GC ACTIVITY: %d garbage collections in last minute!", gcPerMin)
}
if numGoroutines > 1000 {
log.Printf("⚠️ HIGH GOROUTINE COUNT: %d goroutines active!", numGoroutines)
}
lastGC = m.NumGC
lastEvents = currentEvents
lastRejected = currentRejected
lastArchived = currentArchived
}
}
}
// Debug handlers
func debugStatsHandler(w http.ResponseWriter, r *http.Request) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
stats := fmt.Sprintf(`Debug Statistics:
Memory:
Allocated: %d KB
System: %d KB
Total Allocations: %d
GC Cycles: %d
Goroutines: %d
Events:
Total Events: %d
Rejected Events: %d
Archived Events: %d
Trusted Notes: %d
Untrusted Notes: %d
Refreshes:
Profile Refreshes: %d
Network Refreshes: %d
Data Structures:
Relays: %d
Trust Network: %d
One Hop Network: %d
Follower Count Map: %d
`,
m.Alloc/1024,
m.Sys/1024,
m.Mallocs,
m.NumGC,
runtime.NumGoroutine(),
atomic.LoadUint64(&totalEvents),
atomic.LoadUint64(&rejectedEvents),
atomic.LoadUint64(&archivedEvents),
atomic.LoadUint64(&trustedNotes),
atomic.LoadUint64(&untrustedNotes),
atomic.LoadUint64(&profileRefreshCount),
atomic.LoadUint64(&networkRefreshCount),
len(relays),
len(trustNetwork),
len(oneHopNetwork),
len(pubkeyFollowerCount),
)
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(stats))
}
func debugGoroutinesHandler(w http.ResponseWriter, r *http.Request) {
buf := make([]byte, 1<<20) // 1MB buffer
stackSize := runtime.Stack(buf, true)
w.Header().Set("Content-Type", "text/plain")
w.Write(buf[:stackSize])
}