mirror of
https://github.com/bitvora/wot-relay.git
synced 2025-06-22 07:25:13 +00:00
reduce concurrent routines
This commit is contained in:
parent
1f4143ee5f
commit
f2efc9199b
183
main.go
183
main.go
@ -7,11 +7,13 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fiatjaf/eventstore"
|
"github.com/fiatjaf/eventstore"
|
||||||
@ -62,7 +64,16 @@ var trustNetworkMap map[string]bool
|
|||||||
var pubkeyFollowerCount = make(map[string]int)
|
var pubkeyFollowerCount = make(map[string]int)
|
||||||
var trustedNotes uint64
|
var trustedNotes uint64
|
||||||
var untrustedNotes uint64
|
var untrustedNotes uint64
|
||||||
var archiveEventSemaphore = make(chan struct{}, 100) // Limit concurrent goroutines
|
var archiveEventSemaphore = make(chan struct{}, 20) // Reduced from 100 to 20
|
||||||
|
|
||||||
|
// Performance counters
|
||||||
|
var (
|
||||||
|
totalEvents uint64
|
||||||
|
rejectedEvents uint64
|
||||||
|
archivedEvents uint64
|
||||||
|
profileRefreshCount uint64
|
||||||
|
networkRefreshCount uint64
|
||||||
|
)
|
||||||
|
|
||||||
// Mutexes for thread safety
|
// Mutexes for thread safety
|
||||||
var (
|
var (
|
||||||
@ -133,6 +144,8 @@ func main() {
|
|||||||
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
||||||
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||||
relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) {
|
relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) {
|
||||||
|
atomic.AddUint64(&totalEvents, 1)
|
||||||
|
|
||||||
// Don't reject events if we haven't booted yet or if trust network is empty
|
// Don't reject events if we haven't booted yet or if trust network is empty
|
||||||
if !booted {
|
if !booted {
|
||||||
return false, ""
|
return false, ""
|
||||||
@ -149,9 +162,11 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !trusted {
|
if !trusted {
|
||||||
|
atomic.AddUint64(&rejectedEvents, 1)
|
||||||
return true, "not in web of trust"
|
return true, "not in web of trust"
|
||||||
}
|
}
|
||||||
if event.Kind == nostr.KindEncryptedDirectMessage {
|
if event.Kind == nostr.KindEncryptedDirectMessage {
|
||||||
|
atomic.AddUint64(&rejectedEvents, 1)
|
||||||
return true, "only gift wrapped DMs are allowed"
|
return true, "only gift wrapped DMs are allowed"
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,6 +192,7 @@ func main() {
|
|||||||
|
|
||||||
go refreshTrustNetwork(ctx, relay)
|
go refreshTrustNetwork(ctx, relay)
|
||||||
go monitorMemoryUsage() // Add memory monitoring
|
go monitorMemoryUsage() // Add memory monitoring
|
||||||
|
go monitorPerformance() // Add performance monitoring
|
||||||
|
|
||||||
mux := relay.Router()
|
mux := relay.Router()
|
||||||
static := http.FileServer(http.Dir(config.StaticPath))
|
static := http.FileServer(http.Dir(config.StaticPath))
|
||||||
@ -184,6 +200,10 @@ func main() {
|
|||||||
mux.Handle("GET /static/", http.StripPrefix("/static/", static))
|
mux.Handle("GET /static/", http.StripPrefix("/static/", static))
|
||||||
mux.Handle("GET /favicon.ico", http.StripPrefix("/", static))
|
mux.Handle("GET /favicon.ico", http.StripPrefix("/", static))
|
||||||
|
|
||||||
|
// Add debug endpoints
|
||||||
|
mux.HandleFunc("GET /debug/stats", debugStatsHandler)
|
||||||
|
mux.HandleFunc("GET /debug/goroutines", debugGoroutinesHandler)
|
||||||
|
|
||||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
tmpl := template.Must(template.ParseFiles(os.Getenv("INDEX_PATH")))
|
tmpl := template.Must(template.ParseFiles(os.Getenv("INDEX_PATH")))
|
||||||
data := struct {
|
data := struct {
|
||||||
@ -204,6 +224,10 @@ func main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
log.Println("🎉 relay running on port :3334")
|
log.Println("🎉 relay running on port :3334")
|
||||||
|
log.Println("🔍 debug endpoints available at:")
|
||||||
|
log.Println(" http://localhost:3334/debug/pprof/ (CPU/memory profiling)")
|
||||||
|
log.Println(" http://localhost:3334/debug/stats (application stats)")
|
||||||
|
log.Println(" http://localhost:3334/debug/goroutines (goroutine info)")
|
||||||
err := http.ListenAndServe(":3334", relay)
|
err := http.ListenAndServe(":3334", relay)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -346,6 +370,9 @@ func updateTrustNetworkFilter() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func refreshProfiles(ctx context.Context) {
|
func refreshProfiles(ctx context.Context) {
|
||||||
|
atomic.AddUint64(&profileRefreshCount, 1)
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
// Get a snapshot of current trust network to avoid holding locks during network operations
|
// Get a snapshot of current trust network to avoid holding locks during network operations
|
||||||
trustNetworkMutex.RLock()
|
trustNetworkMutex.RLock()
|
||||||
currentTrustNetwork := make([]string, len(trustNetwork))
|
currentTrustNetwork := make([]string, len(trustNetwork))
|
||||||
@ -371,11 +398,15 @@ func refreshProfiles(ctx context.Context) {
|
|||||||
|
|
||||||
cancel() // Cancel after each iteration
|
cancel() // Cancel after each iteration
|
||||||
}
|
}
|
||||||
log.Println("👤 profiles refreshed: ", len(currentTrustNetwork))
|
duration := time.Since(start)
|
||||||
|
log.Printf("👤 profiles refreshed: %d profiles in %v", len(currentTrustNetwork), duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
||||||
runTrustNetworkRefresh := func() {
|
runTrustNetworkRefresh := func() {
|
||||||
|
atomic.AddUint64(&networkRefreshCount, 1)
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
// Build new networks in temporary variables to avoid disrupting the active network
|
// Build new networks in temporary variables to avoid disrupting the active network
|
||||||
var newOneHopNetwork []string
|
var newOneHopNetwork []string
|
||||||
newOneHopNetworkSet := make(map[string]bool)
|
newOneHopNetworkSet := make(map[string]bool)
|
||||||
@ -397,7 +428,9 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
|||||||
}}
|
}}
|
||||||
|
|
||||||
log.Println("🔍 fetching owner's follows")
|
log.Println("🔍 fetching owner's follows")
|
||||||
|
eventCount := 0
|
||||||
for ev := range pool.SubManyEose(timeoutCtx, seedRelays, filters) {
|
for ev := range pool.SubManyEose(timeoutCtx, seedRelays, filters) {
|
||||||
|
eventCount++
|
||||||
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
|
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
|
||||||
pubkey := contact[1]
|
pubkey := contact[1]
|
||||||
if isIgnored(pubkey, config.IgnoredPubkeys) {
|
if isIgnored(pubkey, config.IgnoredPubkeys) {
|
||||||
@ -413,8 +446,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.Printf("🔍 processed %d follow list events", eventCount)
|
||||||
|
|
||||||
log.Println("🌐 building web of trust graph")
|
log.Println("🌐 building web of trust graph")
|
||||||
|
totalProcessed := 0
|
||||||
for i := 0; i < len(newOneHopNetwork); i += 100 {
|
for i := 0; i < len(newOneHopNetwork); i += 100 {
|
||||||
timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
|
timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||||
|
|
||||||
@ -428,7 +463,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
|||||||
Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata},
|
Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
batchCount := 0
|
||||||
for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
|
for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
|
||||||
|
batchCount++
|
||||||
|
totalProcessed++
|
||||||
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
|
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
|
||||||
if len(contact) > 1 {
|
if len(contact) > 1 {
|
||||||
newPubkeyFollowerCount[contact[1]]++
|
newPubkeyFollowerCount[contact[1]]++
|
||||||
@ -444,6 +482,10 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
cancel() // Cancel after each iteration
|
cancel() // Cancel after each iteration
|
||||||
|
|
||||||
|
if i%500 == 0 { // Log progress every 5 batches
|
||||||
|
log.Printf("🌐 processed batch %d-%d (%d events in this batch)", i, end, batchCount)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now atomically replace the active data structures
|
// Now atomically replace the active data structures
|
||||||
@ -456,7 +498,8 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
|||||||
pubkeyFollowerCount = newPubkeyFollowerCount
|
pubkeyFollowerCount = newPubkeyFollowerCount
|
||||||
followerMutex.Unlock()
|
followerMutex.Unlock()
|
||||||
|
|
||||||
log.Println("🫂 total network size:", len(newPubkeyFollowerCount))
|
duration := time.Since(start)
|
||||||
|
log.Printf("🫂 total network size: %d (processed %d events in %v)", len(newPubkeyFollowerCount), totalProcessed, duration)
|
||||||
relayMutex.RLock()
|
relayMutex.RLock()
|
||||||
log.Println("🔗 relays discovered:", len(relays))
|
log.Println("🔗 relays discovered:", len(relays))
|
||||||
relayMutex.RUnlock()
|
relayMutex.RUnlock()
|
||||||
@ -576,7 +619,24 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
|||||||
|
|
||||||
log.Println("📦 archiving trusted notes...")
|
log.Println("📦 archiving trusted notes...")
|
||||||
|
|
||||||
|
eventCount := 0
|
||||||
for ev := range pool.SubMany(timeout, seedRelays, filters) {
|
for ev := range pool.SubMany(timeout, seedRelays, filters) {
|
||||||
|
eventCount++
|
||||||
|
|
||||||
|
// Check GC pressure every 1000 events
|
||||||
|
if eventCount%1000 == 0 {
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
if m.NumGC > 0 && eventCount > 1000 {
|
||||||
|
// If we're doing more than 2 GCs per 1000 events, slow down
|
||||||
|
gcRate := float64(m.NumGC) / float64(eventCount/1000)
|
||||||
|
if gcRate > 2.0 {
|
||||||
|
log.Printf("⚠️ High GC pressure (%.1f GC/1000 events), slowing archive process", gcRate)
|
||||||
|
time.Sleep(100 * time.Millisecond) // Brief pause
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Use semaphore to limit concurrent goroutines
|
// Use semaphore to limit concurrent goroutines
|
||||||
select {
|
select {
|
||||||
case archiveEventSemaphore <- struct{}{}:
|
case archiveEventSemaphore <- struct{}{}:
|
||||||
@ -585,11 +645,16 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
|||||||
archiveEvent(ctx, relay, event)
|
archiveEvent(ctx, relay, event)
|
||||||
}(*ev.Event)
|
}(*ev.Event)
|
||||||
case <-timeout.Done():
|
case <-timeout.Done():
|
||||||
|
log.Printf("📦 archive timeout reached, processed %d events", eventCount)
|
||||||
return
|
return
|
||||||
|
default:
|
||||||
|
// If semaphore is full, process synchronously to avoid buildup
|
||||||
|
archiveEvent(ctx, relay, *ev.Event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes")
|
log.Printf("📦 archived %d trusted notes and discarded %d untrusted notes (processed %d total events)",
|
||||||
|
atomic.LoadUint64(&trustedNotes), atomic.LoadUint64(&untrustedNotes), eventCount)
|
||||||
} else {
|
} else {
|
||||||
log.Println("🔄 web of trust will refresh in", config.RefreshInterval, "hours")
|
log.Println("🔄 web of trust will refresh in", config.RefreshInterval, "hours")
|
||||||
select {
|
select {
|
||||||
@ -614,9 +679,10 @@ func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) {
|
|||||||
if trusted {
|
if trusted {
|
||||||
wdb.Publish(ctx, ev)
|
wdb.Publish(ctx, ev)
|
||||||
relay.BroadcastEvent(&ev)
|
relay.BroadcastEvent(&ev)
|
||||||
trustedNotes++
|
atomic.AddUint64(&trustedNotes, 1)
|
||||||
|
atomic.AddUint64(&archivedEvents, 1)
|
||||||
} else {
|
} else {
|
||||||
untrustedNotes++
|
atomic.AddUint64(&untrustedNotes, 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -760,3 +826,108 @@ func monitorMemoryUsage() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add performance monitoring
|
||||||
|
func monitorPerformance() {
|
||||||
|
ticker := time.NewTicker(1 * time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
var lastGC uint32
|
||||||
|
var lastEvents, lastRejected, lastArchived uint64
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
|
||||||
|
currentEvents := atomic.LoadUint64(&totalEvents)
|
||||||
|
currentRejected := atomic.LoadUint64(&rejectedEvents)
|
||||||
|
currentArchived := atomic.LoadUint64(&archivedEvents)
|
||||||
|
|
||||||
|
eventsPerMin := currentEvents - lastEvents
|
||||||
|
rejectedPerMin := currentRejected - lastRejected
|
||||||
|
archivedPerMin := currentArchived - lastArchived
|
||||||
|
gcPerMin := m.NumGC - lastGC
|
||||||
|
|
||||||
|
numGoroutines := runtime.NumGoroutine()
|
||||||
|
|
||||||
|
log.Printf("⚡ Performance: Events/min=%d, Rejected/min=%d, Archived/min=%d, GC/min=%d, Goroutines=%d",
|
||||||
|
eventsPerMin, rejectedPerMin, archivedPerMin, gcPerMin, numGoroutines)
|
||||||
|
|
||||||
|
if gcPerMin > 60 {
|
||||||
|
log.Printf("⚠️ HIGH GC ACTIVITY: %d garbage collections in last minute!", gcPerMin)
|
||||||
|
}
|
||||||
|
|
||||||
|
if numGoroutines > 1000 {
|
||||||
|
log.Printf("⚠️ HIGH GOROUTINE COUNT: %d goroutines active!", numGoroutines)
|
||||||
|
}
|
||||||
|
|
||||||
|
lastGC = m.NumGC
|
||||||
|
lastEvents = currentEvents
|
||||||
|
lastRejected = currentRejected
|
||||||
|
lastArchived = currentArchived
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debug handlers
|
||||||
|
func debugStatsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
|
||||||
|
stats := fmt.Sprintf(`Debug Statistics:
|
||||||
|
|
||||||
|
Memory:
|
||||||
|
Allocated: %d KB
|
||||||
|
System: %d KB
|
||||||
|
Total Allocations: %d
|
||||||
|
GC Cycles: %d
|
||||||
|
Goroutines: %d
|
||||||
|
|
||||||
|
Events:
|
||||||
|
Total Events: %d
|
||||||
|
Rejected Events: %d
|
||||||
|
Archived Events: %d
|
||||||
|
Trusted Notes: %d
|
||||||
|
Untrusted Notes: %d
|
||||||
|
|
||||||
|
Refreshes:
|
||||||
|
Profile Refreshes: %d
|
||||||
|
Network Refreshes: %d
|
||||||
|
|
||||||
|
Data Structures:
|
||||||
|
Relays: %d
|
||||||
|
Trust Network: %d
|
||||||
|
One Hop Network: %d
|
||||||
|
Follower Count Map: %d
|
||||||
|
`,
|
||||||
|
m.Alloc/1024,
|
||||||
|
m.Sys/1024,
|
||||||
|
m.Mallocs,
|
||||||
|
m.NumGC,
|
||||||
|
runtime.NumGoroutine(),
|
||||||
|
atomic.LoadUint64(&totalEvents),
|
||||||
|
atomic.LoadUint64(&rejectedEvents),
|
||||||
|
atomic.LoadUint64(&archivedEvents),
|
||||||
|
atomic.LoadUint64(&trustedNotes),
|
||||||
|
atomic.LoadUint64(&untrustedNotes),
|
||||||
|
atomic.LoadUint64(&profileRefreshCount),
|
||||||
|
atomic.LoadUint64(&networkRefreshCount),
|
||||||
|
len(relays),
|
||||||
|
len(trustNetwork),
|
||||||
|
len(oneHopNetwork),
|
||||||
|
len(pubkeyFollowerCount),
|
||||||
|
)
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
w.Write([]byte(stats))
|
||||||
|
}
|
||||||
|
|
||||||
|
func debugGoroutinesHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
buf := make([]byte, 1<<20) // 1MB buffer
|
||||||
|
stackSize := runtime.Stack(buf, true)
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
w.Write(buf[:stackSize])
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user