From 1f9c1dccf50bbe11ad95ae7f10e60d7cecc82b1f Mon Sep 17 00:00:00 2001 From: Barry Deen Date: Sun, 15 Jun 2025 21:42:58 -0400 Subject: [PATCH 1/2] batching, goroutine limits --- main.go | 339 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 273 insertions(+), 66 deletions(-) diff --git a/main.go b/main.go index ce6c439..4dbaeae 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,10 @@ import ( "log" "net/http" "os" + "runtime" "strconv" "strings" + "sync" "time" "github.com/fiatjaf/eventstore" @@ -40,20 +42,35 @@ type Config struct { MaxAgeDays int ArchiveReactions bool IgnoredPubkeys []string + MaxTrustNetwork int + MaxRelays int + MaxOneHopNetwork int } var pool *nostr.SimplePool var wdb nostr.RelayStore var relays []string +var relaySet = make(map[string]bool) // O(1) lookup var config Config var trustNetwork []string +var trustNetworkSet = make(map[string]bool) // O(1) lookup var seedRelays []string var booted bool var oneHopNetwork []string +var oneHopNetworkSet = make(map[string]bool) // O(1) lookup var trustNetworkMap map[string]bool var pubkeyFollowerCount = make(map[string]int) var trustedNotes uint64 var untrustedNotes uint64 +var archiveEventSemaphore = make(chan struct{}, 100) // Limit concurrent goroutines + +// Mutexes for thread safety +var ( + relayMutex sync.RWMutex + trustNetworkMutex sync.RWMutex + oneHopMutex sync.RWMutex + followerMutex sync.RWMutex +) func main() { nostr.InfoLogger = log.New(io.Discard, "", 0) @@ -116,8 +133,23 @@ func main() { relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) { - if !trustNetworkMap[event.PubKey] { - return true, "we are rebuilding the trust network, please try again later" + // Don't reject events if we haven't booted yet or if trust network is empty + if !booted { + return false, "" + } + + trustNetworkMutex.RLock() + trusted := trustNetworkMap[event.PubKey] + hasNetwork := len(trustNetworkMap) > 0 + trustNetworkMutex.RUnlock() + + // If we don't have a trust network yet, allow all events + if !hasNetwork { + return false, "" + } + + if !trusted { + return true, "not in web of trust" } if event.Kind == nostr.KindEncryptedDirectMessage { return true, "only gift wrapped DMs are allowed" @@ -144,6 +176,7 @@ func main() { } go refreshTrustNetwork(ctx, relay) + go monitorMemoryUsage() // Add memory monitoring mux := relay.Router() static := http.FileServer(http.Dir(config.StaticPath)) @@ -211,6 +244,18 @@ func LoadConfig() Config { os.Setenv("ARCHIVE_REACTIONS", "FALSE") } + if os.Getenv("MAX_TRUST_NETWORK") == "" { + os.Setenv("MAX_TRUST_NETWORK", "10000") + } + + if os.Getenv("MAX_RELAYS") == "" { + os.Setenv("MAX_RELAYS", "1000") + } + + if os.Getenv("MAX_ONE_HOP_NETWORK") == "" { + os.Setenv("MAX_ONE_HOP_NETWORK", "50000") + } + ignoredPubkeys := []string{} if ignoreList := os.Getenv("IGNORE_FOLLOWS_LIST"); ignoreList != "" { ignoredPubkeys = splitAndTrim(ignoreList) @@ -218,6 +263,9 @@ func LoadConfig() Config { minimumFollowers, _ := strconv.Atoi(os.Getenv("MINIMUM_FOLLOWERS")) maxAgeDays, _ := strconv.Atoi(os.Getenv("MAX_AGE_DAYS")) + maxTrustNetwork, _ := strconv.Atoi(os.Getenv("MAX_TRUST_NETWORK")) + maxRelays, _ := strconv.Atoi(os.Getenv("MAX_RELAYS")) + maxOneHopNetwork, _ := strconv.Atoi(os.Getenv("MAX_ONE_HOP_NETWORK")) config := Config{ RelayName: getEnv("RELAY_NAME"), @@ -235,6 +283,9 @@ func LoadConfig() Config { MaxAgeDays: maxAgeDays, ArchiveReactions: getEnv("ARCHIVE_REACTIONS") == "TRUE", IgnoredPubkeys: ignoredPubkeys, + MaxTrustNetwork: maxTrustNetwork, + MaxRelays: maxRelays, + MaxOneHopNetwork: maxOneHopNetwork, } return config @@ -249,44 +300,94 @@ func getEnv(key string) string { } func updateTrustNetworkFilter() { - trustNetworkMap = make(map[string]bool) + // Build new trust network in temporary variables + newTrustNetworkMap := make(map[string]bool) + var newTrustNetwork []string + newTrustNetworkSet := make(map[string]bool) - log.Println("๐ŸŒ updating trust network map") + log.Println("๐ŸŒ building new trust network map") + + followerMutex.RLock() for pubkey, count := range pubkeyFollowerCount { if count >= config.MinimumFollowers { - trustNetworkMap[pubkey] = true - appendPubkey(pubkey) + newTrustNetworkMap[pubkey] = true + if !newTrustNetworkSet[pubkey] && len(pubkey) == 64 && len(newTrustNetwork) < config.MaxTrustNetwork { + newTrustNetwork = append(newTrustNetwork, pubkey) + newTrustNetworkSet[pubkey] = true + } } } + followerMutex.RUnlock() - log.Println("๐ŸŒ trust network map updated with", len(trustNetwork), "keys") + // Now atomically replace the active trust network + trustNetworkMutex.Lock() + trustNetworkMap = newTrustNetworkMap + trustNetwork = newTrustNetwork + trustNetworkSet = newTrustNetworkSet + trustNetworkMutex.Unlock() + + log.Println("๐ŸŒ trust network map updated with", len(newTrustNetwork), "keys") + + // Cleanup follower count map periodically to prevent unbounded growth + followerMutex.Lock() + if len(pubkeyFollowerCount) > config.MaxOneHopNetwork*2 { + log.Println("๐Ÿงน cleaning follower count map") + newFollowerCount := make(map[string]int) + for pubkey, count := range pubkeyFollowerCount { + if count >= config.MinimumFollowers || newTrustNetworkMap[pubkey] { + newFollowerCount[pubkey] = count + } + } + oldCount := len(pubkeyFollowerCount) + pubkeyFollowerCount = newFollowerCount + log.Printf("๐Ÿงน cleaned follower count map: %d -> %d entries", oldCount, len(newFollowerCount)) + } + followerMutex.Unlock() } func refreshProfiles(ctx context.Context) { - for i := 0; i < len(trustNetwork); i += 200 { + // Get a snapshot of current trust network to avoid holding locks during network operations + trustNetworkMutex.RLock() + currentTrustNetwork := make([]string, len(trustNetwork)) + copy(currentTrustNetwork, trustNetwork) + trustNetworkMutex.RUnlock() + + for i := 0; i < len(currentTrustNetwork); i += 200 { timeout, cancel := context.WithTimeout(ctx, 4*time.Second) - defer cancel() end := i + 200 - if end > len(trustNetwork) { - end = len(trustNetwork) + if end > len(currentTrustNetwork) { + end = len(currentTrustNetwork) } filters := []nostr.Filter{{ - Authors: trustNetwork[i:end], + Authors: currentTrustNetwork[i:end], Kinds: []int{nostr.KindProfileMetadata}, }} for ev := range pool.SubManyEose(timeout, seedRelays, filters) { wdb.Publish(ctx, *ev.Event) } + + cancel() // Cancel after each iteration } - log.Println("๐Ÿ‘ค profiles refreshed: ", len(trustNetwork)) + log.Println("๐Ÿ‘ค profiles refreshed: ", len(currentTrustNetwork)) } func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { - runTrustNetworkRefresh := func() { + // Build new networks in temporary variables to avoid disrupting the active network + var newOneHopNetwork []string + newOneHopNetworkSet := make(map[string]bool) + newPubkeyFollowerCount := make(map[string]int) + + // Copy existing follower counts to preserve data + followerMutex.RLock() + for k, v := range pubkeyFollowerCount { + newPubkeyFollowerCount[k] = v + } + followerMutex.RUnlock() + timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() @@ -303,30 +404,34 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { fmt.Println("ignoring follows from pubkey: ", pubkey) continue } - pubkeyFollowerCount[contact[1]]++ // Increment follower count for the pubkey - appendOneHopNetwork(contact[1]) + newPubkeyFollowerCount[contact[1]]++ + + // Add to new one-hop network + if !newOneHopNetworkSet[contact[1]] && len(contact[1]) == 64 && len(newOneHopNetwork) < config.MaxOneHopNetwork { + newOneHopNetwork = append(newOneHopNetwork, contact[1]) + newOneHopNetworkSet[contact[1]] = true + } } } log.Println("๐ŸŒ building web of trust graph") - for i := 0; i < len(oneHopNetwork); i += 100 { + for i := 0; i < len(newOneHopNetwork); i += 100 { timeout, cancel := context.WithTimeout(ctx, 4*time.Second) - defer cancel() end := i + 100 - if end > len(oneHopNetwork) { - end = len(oneHopNetwork) + if end > len(newOneHopNetwork) { + end = len(newOneHopNetwork) } filters = []nostr.Filter{{ - Authors: oneHopNetwork[i:end], + Authors: newOneHopNetwork[i:end], Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata}, }} for ev := range pool.SubManyEose(timeout, seedRelays, filters) { for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) { if len(contact) > 1 { - pubkeyFollowerCount[contact[1]]++ // Increment follower count for the pubkey + newPubkeyFollowerCount[contact[1]]++ } } @@ -338,34 +443,82 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) { wdb.Publish(ctx, *ev.Event) } } + cancel() // Cancel after each iteration } - log.Println("๐Ÿซ‚ total network size:", len(pubkeyFollowerCount)) + + // Now atomically replace the active data structures + oneHopMutex.Lock() + oneHopNetwork = newOneHopNetwork + oneHopNetworkSet = newOneHopNetworkSet + oneHopMutex.Unlock() + + followerMutex.Lock() + pubkeyFollowerCount = newPubkeyFollowerCount + followerMutex.Unlock() + + log.Println("๐Ÿซ‚ total network size:", len(newPubkeyFollowerCount)) + relayMutex.RLock() log.Println("๐Ÿ”— relays discovered:", len(relays)) + relayMutex.RUnlock() } + ticker := time.NewTicker(time.Duration(config.RefreshInterval) * time.Hour) + defer ticker.Stop() + + // Run initial refresh + log.Println("๐Ÿš€ performing initial trust network build...") + runTrustNetworkRefresh() + updateTrustNetworkFilter() + + // Mark as booted after initial trust network is built + booted = true + log.Println("โœ… trust network initialized, relay is now active") + + deleteOldNotes(relay) + archiveTrustedNotes(ctx, relay) + + // Then run on timer for { - runTrustNetworkRefresh() - updateTrustNetworkFilter() - deleteOldNotes(relay) - archiveTrustedNotes(ctx, relay) + select { + case <-ticker.C: + log.Println("๐Ÿ”„ refreshing trust network in background...") + runTrustNetworkRefresh() + updateTrustNetworkFilter() + deleteOldNotes(relay) + archiveTrustedNotes(ctx, relay) + log.Println("โœ… trust network refresh completed") + case <-ctx.Done(): + return + } } } func appendRelay(relay string) { + relayMutex.Lock() + defer relayMutex.Unlock() - for _, r := range relays { - if r == relay { - return - } + if len(relays) >= config.MaxRelays { + return // Prevent unbounded growth } + + if relaySet[relay] { + return // Already exists + } + relays = append(relays, relay) + relaySet[relay] = true } func appendPubkey(pubkey string) { - for _, pk := range trustNetwork { - if pk == pubkey { - return - } + trustNetworkMutex.Lock() + defer trustNetworkMutex.Unlock() + + if len(trustNetwork) >= config.MaxTrustNetwork { + return // Prevent unbounded growth + } + + if trustNetworkSet[pubkey] { + return // Already exists } if len(pubkey) != 64 { @@ -373,20 +526,7 @@ func appendPubkey(pubkey string) { } trustNetwork = append(trustNetwork, pubkey) -} - -func appendOneHopNetwork(pubkey string) { - for _, pk := range oneHopNetwork { - if pk == pubkey { - return - } - } - - if len(pubkey) != 64 { - return - } - - oneHopNetwork = append(oneHopNetwork, pubkey) + trustNetworkSet[pubkey] = true } func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { @@ -396,12 +536,12 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { done := make(chan struct{}) go func() { + defer close(done) if config.ArchivalSync { go refreshProfiles(ctx) var filters []nostr.Filter if config.ArchiveReactions { - filters = []nostr.Filter{{ Kinds: []int{ nostr.KindArticle, @@ -437,7 +577,16 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { log.Println("๐Ÿ“ฆ archiving trusted notes...") for ev := range pool.SubMany(timeout, seedRelays, filters) { - go archiveEvent(ctx, relay, *ev.Event) + // Use semaphore to limit concurrent goroutines + select { + case archiveEventSemaphore <- struct{}{}: + go func(event nostr.Event) { + defer func() { <-archiveEventSemaphore }() + archiveEvent(ctx, relay, event) + }(*ev.Event) + case <-timeout.Done(): + return + } } log.Println("๐Ÿ“ฆ archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes") @@ -447,8 +596,6 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { case <-timeout.Done(): } } - - close(done) }() select { @@ -460,7 +607,11 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) { } func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) { - if trustNetworkMap[ev.PubKey] { + trustNetworkMutex.RLock() + trusted := trustNetworkMap[ev.PubKey] + trustNetworkMutex.RUnlock() + + if trusted { wdb.Publish(ctx, ev) relay.BroadcastEvent(&ev) trustedNotes++ @@ -499,6 +650,7 @@ func deleteOldNotes(relay *khatru.Relay) error { nostr.KindZap, nostr.KindTextNote, }, + Limit: 1000, // Process in batches to avoid memory issues } ch, err := relay.QueryEvents[0](ctx, filter) @@ -507,27 +659,47 @@ func deleteOldNotes(relay *khatru.Relay) error { return err } - events := make([]*nostr.Event, 0) + // Process events in batches to avoid memory issues + batchSize := 100 + events := make([]*nostr.Event, 0, batchSize) + count := 0 for evt := range ch { events = append(events, evt) + count++ + + if len(events) >= batchSize { + // Delete this batch + for num_evt, del_evt := range events { + for _, del := range relay.DeleteEvent { + if err := del(ctx, del_evt); err != nil { + log.Printf("error deleting note %d of batch. event id: %s", num_evt, del_evt.ID) + return err + } + } + } + events = events[:0] // Reset slice but keep capacity + } } - if len(events) < 1 { - log.Println("0 old notes found") - return nil - } - - for num_evt, del_evt := range events { - for _, del := range relay.DeleteEvent { - if err := del(ctx, del_evt); err != nil { - log.Printf("error deleting note %d of %d. event id: %s", num_evt, len(events), del_evt.ID) - return err + // Delete remaining events + if len(events) > 0 { + for num_evt, del_evt := range events { + for _, del := range relay.DeleteEvent { + if err := del(ctx, del_evt); err != nil { + log.Printf("error deleting note %d of final batch. event id: %s", num_evt, del_evt.ID) + return err + } } } } - log.Printf("%d old (until %d) notes deleted", len(events), oldAge) + if count == 0 { + log.Println("0 old notes found") + } else { + log.Printf("%d old (until %d) notes deleted", count, oldAge) + } + return nil } @@ -553,3 +725,38 @@ func isIgnored(pubkey string, ignoredPubkeys []string) bool { } return false } + +// Add memory monitoring +func monitorMemoryUsage() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var m runtime.MemStats + runtime.ReadMemStats(&m) + + relayMutex.RLock() + relayCount := len(relays) + relayMutex.RUnlock() + + trustNetworkMutex.RLock() + trustNetworkCount := len(trustNetwork) + trustNetworkMutex.RUnlock() + + oneHopMutex.RLock() + oneHopCount := len(oneHopNetwork) + oneHopMutex.RUnlock() + + followerMutex.RLock() + followerCount := len(pubkeyFollowerCount) + followerMutex.RUnlock() + + log.Printf("๐Ÿ“Š Memory: Alloc=%d KB, Sys=%d KB, NumGC=%d", + m.Alloc/1024, m.Sys/1024, m.NumGC) + log.Printf("๐Ÿ“Š Data structures: Relays=%d, TrustNetwork=%d, OneHop=%d, Followers=%d", + relayCount, trustNetworkCount, oneHopCount, followerCount) + } + } +} From 1f4143ee5fa21cb61eb3e1cf4fe7f6577d0ad5f9 Mon Sep 17 00:00:00 2001 From: Barry Deen Date: Sun, 15 Jun 2025 21:53:42 -0400 Subject: [PATCH 2/2] change default to 40k --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 4dbaeae..8680e1f 100644 --- a/main.go +++ b/main.go @@ -245,7 +245,7 @@ func LoadConfig() Config { } if os.Getenv("MAX_TRUST_NETWORK") == "" { - os.Setenv("MAX_TRUST_NETWORK", "10000") + os.Setenv("MAX_TRUST_NETWORK", "40000") } if os.Getenv("MAX_RELAYS") == "" {