Merge pull request #89 from bitvora/dev-memoryLeak

Dev memory leak
This commit is contained in:
Barry Deen 2025-06-15 23:22:24 -04:00 committed by GitHub
commit 1714a801ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

186
main.go
View File

@ -7,11 +7,13 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
_ "net/http/pprof"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/fiatjaf/eventstore" "github.com/fiatjaf/eventstore"
@ -62,7 +64,16 @@ var trustNetworkMap map[string]bool
var pubkeyFollowerCount = make(map[string]int) var pubkeyFollowerCount = make(map[string]int)
var trustedNotes uint64 var trustedNotes uint64
var untrustedNotes uint64 var untrustedNotes uint64
var archiveEventSemaphore = make(chan struct{}, 100) // Limit concurrent goroutines var archiveEventSemaphore = make(chan struct{}, 20) // Reduced from 100 to 20
// Performance counters
var (
totalEvents uint64
rejectedEvents uint64
archivedEvents uint64
profileRefreshCount uint64
networkRefreshCount uint64
)
// Mutexes for thread safety // Mutexes for thread safety
var ( var (
@ -133,6 +144,8 @@ func main() {
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) { relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) {
atomic.AddUint64(&totalEvents, 1)
// Don't reject events if we haven't booted yet or if trust network is empty // Don't reject events if we haven't booted yet or if trust network is empty
if !booted { if !booted {
return false, "" return false, ""
@ -149,9 +162,11 @@ func main() {
} }
if !trusted { if !trusted {
atomic.AddUint64(&rejectedEvents, 1)
return true, "not in web of trust" return true, "not in web of trust"
} }
if event.Kind == nostr.KindEncryptedDirectMessage { if event.Kind == nostr.KindEncryptedDirectMessage {
atomic.AddUint64(&rejectedEvents, 1)
return true, "only gift wrapped DMs are allowed" return true, "only gift wrapped DMs are allowed"
} }
@ -177,6 +192,7 @@ func main() {
go refreshTrustNetwork(ctx, relay) go refreshTrustNetwork(ctx, relay)
go monitorMemoryUsage() // Add memory monitoring go monitorMemoryUsage() // Add memory monitoring
go monitorPerformance() // Add performance monitoring
mux := relay.Router() mux := relay.Router()
static := http.FileServer(http.Dir(config.StaticPath)) static := http.FileServer(http.Dir(config.StaticPath))
@ -184,6 +200,10 @@ func main() {
mux.Handle("GET /static/", http.StripPrefix("/static/", static)) mux.Handle("GET /static/", http.StripPrefix("/static/", static))
mux.Handle("GET /favicon.ico", http.StripPrefix("/", static)) mux.Handle("GET /favicon.ico", http.StripPrefix("/", static))
// Add debug endpoints
mux.HandleFunc("GET /debug/stats", debugStatsHandler)
mux.HandleFunc("GET /debug/goroutines", debugGoroutinesHandler)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
tmpl := template.Must(template.ParseFiles(os.Getenv("INDEX_PATH"))) tmpl := template.Must(template.ParseFiles(os.Getenv("INDEX_PATH")))
data := struct { data := struct {
@ -204,6 +224,10 @@ func main() {
}) })
log.Println("🎉 relay running on port :3334") log.Println("🎉 relay running on port :3334")
log.Println("🔍 debug endpoints available at:")
log.Println(" http://localhost:3334/debug/pprof/ (CPU/memory profiling)")
log.Println(" http://localhost:3334/debug/stats (application stats)")
log.Println(" http://localhost:3334/debug/goroutines (goroutine info)")
err := http.ListenAndServe(":3334", relay) err := http.ListenAndServe(":3334", relay)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -346,6 +370,9 @@ func updateTrustNetworkFilter() {
} }
func refreshProfiles(ctx context.Context) { func refreshProfiles(ctx context.Context) {
atomic.AddUint64(&profileRefreshCount, 1)
start := time.Now()
// Get a snapshot of current trust network to avoid holding locks during network operations // Get a snapshot of current trust network to avoid holding locks during network operations
trustNetworkMutex.RLock() trustNetworkMutex.RLock()
currentTrustNetwork := make([]string, len(trustNetwork)) currentTrustNetwork := make([]string, len(trustNetwork))
@ -371,11 +398,15 @@ func refreshProfiles(ctx context.Context) {
cancel() // Cancel after each iteration cancel() // Cancel after each iteration
} }
log.Println("👤 profiles refreshed: ", len(currentTrustNetwork)) duration := time.Since(start)
log.Printf("👤 profiles refreshed: %d profiles in %v", len(currentTrustNetwork), duration)
} }
func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
runTrustNetworkRefresh := func() { runTrustNetworkRefresh := func() {
atomic.AddUint64(&networkRefreshCount, 1)
start := time.Now()
// Build new networks in temporary variables to avoid disrupting the active network // Build new networks in temporary variables to avoid disrupting the active network
var newOneHopNetwork []string var newOneHopNetwork []string
newOneHopNetworkSet := make(map[string]bool) newOneHopNetworkSet := make(map[string]bool)
@ -397,7 +428,9 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
}} }}
log.Println("🔍 fetching owner's follows") log.Println("🔍 fetching owner's follows")
eventCount := 0
for ev := range pool.SubManyEose(timeoutCtx, seedRelays, filters) { for ev := range pool.SubManyEose(timeoutCtx, seedRelays, filters) {
eventCount++
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) { for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
pubkey := contact[1] pubkey := contact[1]
if isIgnored(pubkey, config.IgnoredPubkeys) { if isIgnored(pubkey, config.IgnoredPubkeys) {
@ -413,8 +446,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
} }
} }
} }
log.Printf("🔍 processed %d follow list events", eventCount)
log.Println("🌐 building web of trust graph") log.Println("🌐 building web of trust graph")
totalProcessed := 0
for i := 0; i < len(newOneHopNetwork); i += 100 { for i := 0; i < len(newOneHopNetwork); i += 100 {
timeout, cancel := context.WithTimeout(ctx, 4*time.Second) timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
@ -428,7 +463,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata}, Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata},
}} }}
batchCount := 0
for ev := range pool.SubManyEose(timeout, seedRelays, filters) { for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
batchCount++
totalProcessed++
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) { for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
if len(contact) > 1 { if len(contact) > 1 {
newPubkeyFollowerCount[contact[1]]++ newPubkeyFollowerCount[contact[1]]++
@ -444,6 +482,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
} }
} }
cancel() // Cancel after each iteration cancel() // Cancel after each iteration
if i%500 == 0 { // Log progress every 5 batches
log.Printf("🌐 processed batch %d-%d (%d events in this batch)", i, end, batchCount)
}
} }
// Now atomically replace the active data structures // Now atomically replace the active data structures
@ -456,7 +498,8 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
pubkeyFollowerCount = newPubkeyFollowerCount pubkeyFollowerCount = newPubkeyFollowerCount
followerMutex.Unlock() followerMutex.Unlock()
log.Println("🫂 total network size:", len(newPubkeyFollowerCount)) duration := time.Since(start)
log.Printf("🫂 total network size: %d (processed %d events in %v)", len(newPubkeyFollowerCount), totalProcessed, duration)
relayMutex.RLock() relayMutex.RLock()
log.Println("🔗 relays discovered:", len(relays)) log.Println("🔗 relays discovered:", len(relays))
relayMutex.RUnlock() relayMutex.RUnlock()
@ -541,6 +584,7 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
go refreshProfiles(ctx) go refreshProfiles(ctx)
var filters []nostr.Filter var filters []nostr.Filter
since := nostr.Now()
if config.ArchiveReactions { if config.ArchiveReactions {
filters = []nostr.Filter{{ filters = []nostr.Filter{{
Kinds: []int{ Kinds: []int{
@ -556,6 +600,7 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
nostr.KindZap, nostr.KindZap,
nostr.KindTextNote, nostr.KindTextNote,
}, },
Since: &since,
}} }}
} else { } else {
filters = []nostr.Filter{{ filters = []nostr.Filter{{
@ -571,12 +616,30 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
nostr.KindZap, nostr.KindZap,
nostr.KindTextNote, nostr.KindTextNote,
}, },
Since: &since,
}} }}
} }
log.Println("📦 archiving trusted notes...") log.Println("📦 archiving trusted notes...")
eventCount := 0
for ev := range pool.SubMany(timeout, seedRelays, filters) { for ev := range pool.SubMany(timeout, seedRelays, filters) {
eventCount++
// Check GC pressure every 1000 events
if eventCount%1000 == 0 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
if m.NumGC > 0 && eventCount > 1000 {
// If we're doing more than 2 GCs per 1000 events, slow down
gcRate := float64(m.NumGC) / float64(eventCount/1000)
if gcRate > 2.0 {
log.Printf("⚠️ High GC pressure (%.1f GC/1000 events), slowing archive process", gcRate)
time.Sleep(100 * time.Millisecond) // Brief pause
}
}
}
// Use semaphore to limit concurrent goroutines // Use semaphore to limit concurrent goroutines
select { select {
case archiveEventSemaphore <- struct{}{}: case archiveEventSemaphore <- struct{}{}:
@ -585,11 +648,16 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
archiveEvent(ctx, relay, event) archiveEvent(ctx, relay, event)
}(*ev.Event) }(*ev.Event)
case <-timeout.Done(): case <-timeout.Done():
log.Printf("📦 archive timeout reached, processed %d events", eventCount)
return return
default:
// If semaphore is full, process synchronously to avoid buildup
archiveEvent(ctx, relay, *ev.Event)
} }
} }
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes") log.Printf("📦 archived %d trusted notes and discarded %d untrusted notes (processed %d total events)",
atomic.LoadUint64(&trustedNotes), atomic.LoadUint64(&untrustedNotes), eventCount)
} else { } else {
log.Println("🔄 web of trust will refresh in", config.RefreshInterval, "hours") log.Println("🔄 web of trust will refresh in", config.RefreshInterval, "hours")
select { select {
@ -614,9 +682,10 @@ func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) {
if trusted { if trusted {
wdb.Publish(ctx, ev) wdb.Publish(ctx, ev)
relay.BroadcastEvent(&ev) relay.BroadcastEvent(&ev)
trustedNotes++ atomic.AddUint64(&trustedNotes, 1)
atomic.AddUint64(&archivedEvents, 1)
} else { } else {
untrustedNotes++ atomic.AddUint64(&untrustedNotes, 1)
} }
} }
@ -760,3 +829,108 @@ func monitorMemoryUsage() {
} }
} }
} }
// Add performance monitoring
func monitorPerformance() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
var lastGC uint32
var lastEvents, lastRejected, lastArchived uint64
for {
select {
case <-ticker.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
currentEvents := atomic.LoadUint64(&totalEvents)
currentRejected := atomic.LoadUint64(&rejectedEvents)
currentArchived := atomic.LoadUint64(&archivedEvents)
eventsPerMin := currentEvents - lastEvents
rejectedPerMin := currentRejected - lastRejected
archivedPerMin := currentArchived - lastArchived
gcPerMin := m.NumGC - lastGC
numGoroutines := runtime.NumGoroutine()
log.Printf("⚡ Performance: Events/min=%d, Rejected/min=%d, Archived/min=%d, GC/min=%d, Goroutines=%d",
eventsPerMin, rejectedPerMin, archivedPerMin, gcPerMin, numGoroutines)
if gcPerMin > 60 {
log.Printf("⚠️ HIGH GC ACTIVITY: %d garbage collections in last minute!", gcPerMin)
}
if numGoroutines > 1000 {
log.Printf("⚠️ HIGH GOROUTINE COUNT: %d goroutines active!", numGoroutines)
}
lastGC = m.NumGC
lastEvents = currentEvents
lastRejected = currentRejected
lastArchived = currentArchived
}
}
}
// Debug handlers
func debugStatsHandler(w http.ResponseWriter, r *http.Request) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
stats := fmt.Sprintf(`Debug Statistics:
Memory:
Allocated: %d KB
System: %d KB
Total Allocations: %d
GC Cycles: %d
Goroutines: %d
Events:
Total Events: %d
Rejected Events: %d
Archived Events: %d
Trusted Notes: %d
Untrusted Notes: %d
Refreshes:
Profile Refreshes: %d
Network Refreshes: %d
Data Structures:
Relays: %d
Trust Network: %d
One Hop Network: %d
Follower Count Map: %d
`,
m.Alloc/1024,
m.Sys/1024,
m.Mallocs,
m.NumGC,
runtime.NumGoroutine(),
atomic.LoadUint64(&totalEvents),
atomic.LoadUint64(&rejectedEvents),
atomic.LoadUint64(&archivedEvents),
atomic.LoadUint64(&trustedNotes),
atomic.LoadUint64(&untrustedNotes),
atomic.LoadUint64(&profileRefreshCount),
atomic.LoadUint64(&networkRefreshCount),
len(relays),
len(trustNetwork),
len(oneHopNetwork),
len(pubkeyFollowerCount),
)
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(stats))
}
func debugGoroutinesHandler(w http.ResponseWriter, r *http.Request) {
buf := make([]byte, 1<<20) // 1MB buffer
stackSize := runtime.Stack(buf, true)
w.Header().Set("Content-Type", "text/plain")
w.Write(buf[:stackSize])
}