From f2efc9199b959a3d8bb9076ac616429fda7d049f Mon Sep 17 00:00:00 2001 From: Barry Deen Date: Sun, 15 Jun 2025 22:56:01 -0400 Subject: [PATCH] reduce concurrent routines --- main.go | 183 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 177 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 8680e1f..c579102 100644 --- a/main.go +++ b/main.go @@ -7,11 +7,13 @@ import ( "io" "log" "net/http" + _ "net/http/pprof" "os" "runtime" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/fiatjaf/eventstore" @@ -62,7 +64,16 @@ var trustNetworkMap map[string]bool var pubkeyFollowerCount = make(map[string]int) var trustedNotes uint64 var untrustedNotes uint64 -var archiveEventSemaphore = make(chan struct{}, 100) // Limit concurrent goroutines +var archiveEventSemaphore = make(chan struct{}, 20) // Reduced from 100 to 20 + +// Performance counters +var ( + totalEvents uint64 + rejectedEvents uint64 + archivedEvents uint64 + profileRefreshCount uint64 + networkRefreshCount uint64 +) // Mutexes for thread safety var ( @@ -133,6 +144,8 @@ func main() { relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) { + atomic.AddUint64(&totalEvents, 1) + // Don't reject events if we haven't booted yet or if trust network is empty if !booted { return false, "" @@ -149,9 +162,11 @@ func main() { } if !trusted { + atomic.AddUint64(&rejectedEvents, 1) return true, "not in web of trust" } if event.Kind == nostr.KindEncryptedDirectMessage { + atomic.AddUint64(&rejectedEvents, 1) return true, "only gift wrapped DMs are allowed" } @@ -177,6 +192,7 @@ func main() { go refreshTrustNetwork(ctx, relay) go monitorMemoryUsage() // Add memory monitoring + go monitorPerformance() // Add performance monitoring mux := relay.Router() static := http.FileServer(http.Dir(config.StaticPath)) @@ -184,6 +200,10 @@ func main() { mux.Handle("GET /static/", http.StripPrefix("/static/", static)) mux.Handle("GET /favicon.ico", http.StripPrefix("/", static)) + // Add debug endpoints + mux.HandleFunc("GET /debug/stats", debugStatsHandler) + mux.HandleFunc("GET /debug/goroutines", debugGoroutinesHandler) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { tmpl := template.Must(template.ParseFiles(os.Getenv("INDEX_PATH"))) data := struct { @@ -204,6 +224,10 @@ func main() { }) log.Println("🎉 relay running on port :3334") + log.Println("🔍 debug endpoints available at:") + log.Println(" http://localhost:3334/debug/pprof/ (CPU/memory profiling)") + log.Println(" http://localhost:3334/debug/stats (application stats)") + log.Println(" http://localhost:3334/debug/goroutines (goroutine info)") err := http.ListenAndServe(":3334", relay) if err != nil { log.Fatal(err) @@ -346,6 +370,9 @@ func updateTrustNetworkFilter() { } func refreshProfiles(ctx context.Context) { + atomic.AddUint64(&profileRefreshCount, 1) + start := time.Now() + // Get a snapshot of current trust network to avoid holding locks during network operations trustNetworkMutex.RLock() currentTrustNetwork := make([]string, len(trustNetwork)) @@ -371,11 +398,15 @@ func refreshProfiles(ctx context.Context) { cancel() // Cancel after each iteration } - log.Println("👤 profiles refreshed: ", len(currentTrustNetwork)) + duration := time.Since(start) + log.Printf("👤 profiles refreshed: %d profiles in %v", len(currentTrustNetwork), duration) } func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { runTrustNetworkRefresh := func() { + atomic.AddUint64(&networkRefreshCount, 1) + start := time.Now() + // Build new networks in temporary variables to avoid disrupting the active network var newOneHopNetwork []string newOneHopNetworkSet := make(map[string]bool) @@ -397,7 +428,9 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { }} log.Println("🔍 fetching owner's follows") + eventCount := 0 for ev := range pool.SubManyEose(timeoutCtx, seedRelays, filters) { + eventCount++ for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) { pubkey := contact[1] if isIgnored(pubkey, config.IgnoredPubkeys) { @@ -413,8 +446,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { } } } + log.Printf("🔍 processed %d follow list events", eventCount) log.Println("🌐 building web of trust graph") + totalProcessed := 0 for i := 0; i < len(newOneHopNetwork); i += 100 { timeout, cancel := context.WithTimeout(ctx, 4*time.Second) @@ -428,7 +463,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata}, }} + batchCount := 0 for ev := range pool.SubManyEose(timeout, seedRelays, filters) { + batchCount++ + totalProcessed++ for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) { if len(contact) > 1 { newPubkeyFollowerCount[contact[1]]++ @@ -444,6 +482,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { } } cancel() // Cancel after each iteration + + if i%500 == 0 { // Log progress every 5 batches + log.Printf("🌐 processed batch %d-%d (%d events in this batch)", i, end, batchCount) + } } // Now atomically replace the active data structures @@ -456,7 +498,8 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { pubkeyFollowerCount = newPubkeyFollowerCount followerMutex.Unlock() - log.Println("🫂 total network size:", len(newPubkeyFollowerCount)) + duration := time.Since(start) + log.Printf("🫂 total network size: %d (processed %d events in %v)", len(newPubkeyFollowerCount), totalProcessed, duration) relayMutex.RLock() log.Println("🔗 relays discovered:", len(relays)) relayMutex.RUnlock() @@ -576,7 +619,24 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { log.Println("📦 archiving trusted notes...") + eventCount := 0 for ev := range pool.SubMany(timeout, seedRelays, filters) { + eventCount++ + + // Check GC pressure every 1000 events + if eventCount%1000 == 0 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + if m.NumGC > 0 && eventCount > 1000 { + // If we're doing more than 2 GCs per 1000 events, slow down + gcRate := float64(m.NumGC) / float64(eventCount/1000) + if gcRate > 2.0 { + log.Printf("⚠️ High GC pressure (%.1f GC/1000 events), slowing archive process", gcRate) + time.Sleep(100 * time.Millisecond) // Brief pause + } + } + } + // Use semaphore to limit concurrent goroutines select { case archiveEventSemaphore <- struct{}{}: @@ -585,11 +645,16 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { archiveEvent(ctx, relay, event) }(*ev.Event) case <-timeout.Done(): + log.Printf("📦 archive timeout reached, processed %d events", eventCount) return + default: + // If semaphore is full, process synchronously to avoid buildup + archiveEvent(ctx, relay, *ev.Event) } } - log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes") + log.Printf("📦 archived %d trusted notes and discarded %d untrusted notes (processed %d total events)", + atomic.LoadUint64(&trustedNotes), atomic.LoadUint64(&untrustedNotes), eventCount) } else { log.Println("🔄 web of trust will refresh in", config.RefreshInterval, "hours") select { @@ -614,9 +679,10 @@ func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) { if trusted { wdb.Publish(ctx, ev) relay.BroadcastEvent(&ev) - trustedNotes++ + atomic.AddUint64(&trustedNotes, 1) + atomic.AddUint64(&archivedEvents, 1) } else { - untrustedNotes++ + atomic.AddUint64(&untrustedNotes, 1) } } @@ -760,3 +826,108 @@ func monitorMemoryUsage() { } } } + +// Add performance monitoring +func monitorPerformance() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + var lastGC uint32 + var lastEvents, lastRejected, lastArchived uint64 + + for { + select { + case <-ticker.C: + var m runtime.MemStats + runtime.ReadMemStats(&m) + + currentEvents := atomic.LoadUint64(&totalEvents) + currentRejected := atomic.LoadUint64(&rejectedEvents) + currentArchived := atomic.LoadUint64(&archivedEvents) + + eventsPerMin := currentEvents - lastEvents + rejectedPerMin := currentRejected - lastRejected + archivedPerMin := currentArchived - lastArchived + gcPerMin := m.NumGC - lastGC + + numGoroutines := runtime.NumGoroutine() + + log.Printf("⚡ Performance: Events/min=%d, Rejected/min=%d, Archived/min=%d, GC/min=%d, Goroutines=%d", + eventsPerMin, rejectedPerMin, archivedPerMin, gcPerMin, numGoroutines) + + if gcPerMin > 60 { + log.Printf("⚠️ HIGH GC ACTIVITY: %d garbage collections in last minute!", gcPerMin) + } + + if numGoroutines > 1000 { + log.Printf("⚠️ HIGH GOROUTINE COUNT: %d goroutines active!", numGoroutines) + } + + lastGC = m.NumGC + lastEvents = currentEvents + lastRejected = currentRejected + lastArchived = currentArchived + } + } +} + +// Debug handlers +func debugStatsHandler(w http.ResponseWriter, r *http.Request) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + stats := fmt.Sprintf(`Debug Statistics: + +Memory: + Allocated: %d KB + System: %d KB + Total Allocations: %d + GC Cycles: %d + Goroutines: %d + +Events: + Total Events: %d + Rejected Events: %d + Archived Events: %d + Trusted Notes: %d + Untrusted Notes: %d + +Refreshes: + Profile Refreshes: %d + Network Refreshes: %d + +Data Structures: + Relays: %d + Trust Network: %d + One Hop Network: %d + Follower Count Map: %d +`, + m.Alloc/1024, + m.Sys/1024, + m.Mallocs, + m.NumGC, + runtime.NumGoroutine(), + atomic.LoadUint64(&totalEvents), + atomic.LoadUint64(&rejectedEvents), + atomic.LoadUint64(&archivedEvents), + atomic.LoadUint64(&trustedNotes), + atomic.LoadUint64(&untrustedNotes), + atomic.LoadUint64(&profileRefreshCount), + atomic.LoadUint64(&networkRefreshCount), + len(relays), + len(trustNetwork), + len(oneHopNetwork), + len(pubkeyFollowerCount), + ) + + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte(stats)) +} + +func debugGoroutinesHandler(w http.ResponseWriter, r *http.Request) { + buf := make([]byte, 1<<20) // 1MB buffer + stackSize := runtime.Stack(buf, true) + + w.Header().Set("Content-Type", "text/plain") + w.Write(buf[:stackSize]) +}