mirror of
https://github.com/bitvora/wot-relay.git
synced 2025-06-21 23:15:11 +00:00
batching, goroutine limits
This commit is contained in:
parent
df4d8e5e7c
commit
1f9c1dccf5
339
main.go
339
main.go
@ -8,8 +8,10 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fiatjaf/eventstore"
|
"github.com/fiatjaf/eventstore"
|
||||||
@ -40,20 +42,35 @@ type Config struct {
|
|||||||
MaxAgeDays int
|
MaxAgeDays int
|
||||||
ArchiveReactions bool
|
ArchiveReactions bool
|
||||||
IgnoredPubkeys []string
|
IgnoredPubkeys []string
|
||||||
|
MaxTrustNetwork int
|
||||||
|
MaxRelays int
|
||||||
|
MaxOneHopNetwork int
|
||||||
}
|
}
|
||||||
|
|
||||||
var pool *nostr.SimplePool
|
var pool *nostr.SimplePool
|
||||||
var wdb nostr.RelayStore
|
var wdb nostr.RelayStore
|
||||||
var relays []string
|
var relays []string
|
||||||
|
var relaySet = make(map[string]bool) // O(1) lookup
|
||||||
var config Config
|
var config Config
|
||||||
var trustNetwork []string
|
var trustNetwork []string
|
||||||
|
var trustNetworkSet = make(map[string]bool) // O(1) lookup
|
||||||
var seedRelays []string
|
var seedRelays []string
|
||||||
var booted bool
|
var booted bool
|
||||||
var oneHopNetwork []string
|
var oneHopNetwork []string
|
||||||
|
var oneHopNetworkSet = make(map[string]bool) // O(1) lookup
|
||||||
var trustNetworkMap map[string]bool
|
var trustNetworkMap map[string]bool
|
||||||
var pubkeyFollowerCount = make(map[string]int)
|
var pubkeyFollowerCount = make(map[string]int)
|
||||||
var trustedNotes uint64
|
var trustedNotes uint64
|
||||||
var untrustedNotes uint64
|
var untrustedNotes uint64
|
||||||
|
var archiveEventSemaphore = make(chan struct{}, 100) // Limit concurrent goroutines
|
||||||
|
|
||||||
|
// Mutexes for thread safety
|
||||||
|
var (
|
||||||
|
relayMutex sync.RWMutex
|
||||||
|
trustNetworkMutex sync.RWMutex
|
||||||
|
oneHopMutex sync.RWMutex
|
||||||
|
followerMutex sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
nostr.InfoLogger = log.New(io.Discard, "", 0)
|
nostr.InfoLogger = log.New(io.Discard, "", 0)
|
||||||
@ -116,8 +133,23 @@ func main() {
|
|||||||
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
||||||
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||||
relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) {
|
relay.RejectEvent = append(relay.RejectEvent, func(ctx context.Context, event *nostr.Event) (bool, string) {
|
||||||
if !trustNetworkMap[event.PubKey] {
|
// Don't reject events if we haven't booted yet or if trust network is empty
|
||||||
return true, "we are rebuilding the trust network, please try again later"
|
if !booted {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
trustNetworkMutex.RLock()
|
||||||
|
trusted := trustNetworkMap[event.PubKey]
|
||||||
|
hasNetwork := len(trustNetworkMap) > 0
|
||||||
|
trustNetworkMutex.RUnlock()
|
||||||
|
|
||||||
|
// If we don't have a trust network yet, allow all events
|
||||||
|
if !hasNetwork {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if !trusted {
|
||||||
|
return true, "not in web of trust"
|
||||||
}
|
}
|
||||||
if event.Kind == nostr.KindEncryptedDirectMessage {
|
if event.Kind == nostr.KindEncryptedDirectMessage {
|
||||||
return true, "only gift wrapped DMs are allowed"
|
return true, "only gift wrapped DMs are allowed"
|
||||||
@ -144,6 +176,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go refreshTrustNetwork(ctx, relay)
|
go refreshTrustNetwork(ctx, relay)
|
||||||
|
go monitorMemoryUsage() // Add memory monitoring
|
||||||
|
|
||||||
mux := relay.Router()
|
mux := relay.Router()
|
||||||
static := http.FileServer(http.Dir(config.StaticPath))
|
static := http.FileServer(http.Dir(config.StaticPath))
|
||||||
@ -211,6 +244,18 @@ func LoadConfig() Config {
|
|||||||
os.Setenv("ARCHIVE_REACTIONS", "FALSE")
|
os.Setenv("ARCHIVE_REACTIONS", "FALSE")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if os.Getenv("MAX_TRUST_NETWORK") == "" {
|
||||||
|
os.Setenv("MAX_TRUST_NETWORK", "10000")
|
||||||
|
}
|
||||||
|
|
||||||
|
if os.Getenv("MAX_RELAYS") == "" {
|
||||||
|
os.Setenv("MAX_RELAYS", "1000")
|
||||||
|
}
|
||||||
|
|
||||||
|
if os.Getenv("MAX_ONE_HOP_NETWORK") == "" {
|
||||||
|
os.Setenv("MAX_ONE_HOP_NETWORK", "50000")
|
||||||
|
}
|
||||||
|
|
||||||
ignoredPubkeys := []string{}
|
ignoredPubkeys := []string{}
|
||||||
if ignoreList := os.Getenv("IGNORE_FOLLOWS_LIST"); ignoreList != "" {
|
if ignoreList := os.Getenv("IGNORE_FOLLOWS_LIST"); ignoreList != "" {
|
||||||
ignoredPubkeys = splitAndTrim(ignoreList)
|
ignoredPubkeys = splitAndTrim(ignoreList)
|
||||||
@ -218,6 +263,9 @@ func LoadConfig() Config {
|
|||||||
|
|
||||||
minimumFollowers, _ := strconv.Atoi(os.Getenv("MINIMUM_FOLLOWERS"))
|
minimumFollowers, _ := strconv.Atoi(os.Getenv("MINIMUM_FOLLOWERS"))
|
||||||
maxAgeDays, _ := strconv.Atoi(os.Getenv("MAX_AGE_DAYS"))
|
maxAgeDays, _ := strconv.Atoi(os.Getenv("MAX_AGE_DAYS"))
|
||||||
|
maxTrustNetwork, _ := strconv.Atoi(os.Getenv("MAX_TRUST_NETWORK"))
|
||||||
|
maxRelays, _ := strconv.Atoi(os.Getenv("MAX_RELAYS"))
|
||||||
|
maxOneHopNetwork, _ := strconv.Atoi(os.Getenv("MAX_ONE_HOP_NETWORK"))
|
||||||
|
|
||||||
config := Config{
|
config := Config{
|
||||||
RelayName: getEnv("RELAY_NAME"),
|
RelayName: getEnv("RELAY_NAME"),
|
||||||
@ -235,6 +283,9 @@ func LoadConfig() Config {
|
|||||||
MaxAgeDays: maxAgeDays,
|
MaxAgeDays: maxAgeDays,
|
||||||
ArchiveReactions: getEnv("ARCHIVE_REACTIONS") == "TRUE",
|
ArchiveReactions: getEnv("ARCHIVE_REACTIONS") == "TRUE",
|
||||||
IgnoredPubkeys: ignoredPubkeys,
|
IgnoredPubkeys: ignoredPubkeys,
|
||||||
|
MaxTrustNetwork: maxTrustNetwork,
|
||||||
|
MaxRelays: maxRelays,
|
||||||
|
MaxOneHopNetwork: maxOneHopNetwork,
|
||||||
}
|
}
|
||||||
|
|
||||||
return config
|
return config
|
||||||
@ -249,44 +300,94 @@ func getEnv(key string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func updateTrustNetworkFilter() {
|
func updateTrustNetworkFilter() {
|
||||||
trustNetworkMap = make(map[string]bool)
|
// Build new trust network in temporary variables
|
||||||
|
newTrustNetworkMap := make(map[string]bool)
|
||||||
|
var newTrustNetwork []string
|
||||||
|
newTrustNetworkSet := make(map[string]bool)
|
||||||
|
|
||||||
log.Println("🌐 updating trust network map")
|
log.Println("🌐 building new trust network map")
|
||||||
|
|
||||||
|
followerMutex.RLock()
|
||||||
for pubkey, count := range pubkeyFollowerCount {
|
for pubkey, count := range pubkeyFollowerCount {
|
||||||
if count >= config.MinimumFollowers {
|
if count >= config.MinimumFollowers {
|
||||||
trustNetworkMap[pubkey] = true
|
newTrustNetworkMap[pubkey] = true
|
||||||
appendPubkey(pubkey)
|
if !newTrustNetworkSet[pubkey] && len(pubkey) == 64 && len(newTrustNetwork) < config.MaxTrustNetwork {
|
||||||
|
newTrustNetwork = append(newTrustNetwork, pubkey)
|
||||||
|
newTrustNetworkSet[pubkey] = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
followerMutex.RUnlock()
|
||||||
|
|
||||||
log.Println("🌐 trust network map updated with", len(trustNetwork), "keys")
|
// Now atomically replace the active trust network
|
||||||
|
trustNetworkMutex.Lock()
|
||||||
|
trustNetworkMap = newTrustNetworkMap
|
||||||
|
trustNetwork = newTrustNetwork
|
||||||
|
trustNetworkSet = newTrustNetworkSet
|
||||||
|
trustNetworkMutex.Unlock()
|
||||||
|
|
||||||
|
log.Println("🌐 trust network map updated with", len(newTrustNetwork), "keys")
|
||||||
|
|
||||||
|
// Cleanup follower count map periodically to prevent unbounded growth
|
||||||
|
followerMutex.Lock()
|
||||||
|
if len(pubkeyFollowerCount) > config.MaxOneHopNetwork*2 {
|
||||||
|
log.Println("🧹 cleaning follower count map")
|
||||||
|
newFollowerCount := make(map[string]int)
|
||||||
|
for pubkey, count := range pubkeyFollowerCount {
|
||||||
|
if count >= config.MinimumFollowers || newTrustNetworkMap[pubkey] {
|
||||||
|
newFollowerCount[pubkey] = count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
oldCount := len(pubkeyFollowerCount)
|
||||||
|
pubkeyFollowerCount = newFollowerCount
|
||||||
|
log.Printf("🧹 cleaned follower count map: %d -> %d entries", oldCount, len(newFollowerCount))
|
||||||
|
}
|
||||||
|
followerMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func refreshProfiles(ctx context.Context) {
|
func refreshProfiles(ctx context.Context) {
|
||||||
for i := 0; i < len(trustNetwork); i += 200 {
|
// Get a snapshot of current trust network to avoid holding locks during network operations
|
||||||
|
trustNetworkMutex.RLock()
|
||||||
|
currentTrustNetwork := make([]string, len(trustNetwork))
|
||||||
|
copy(currentTrustNetwork, trustNetwork)
|
||||||
|
trustNetworkMutex.RUnlock()
|
||||||
|
|
||||||
|
for i := 0; i < len(currentTrustNetwork); i += 200 {
|
||||||
timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
|
timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
end := i + 200
|
end := i + 200
|
||||||
if end > len(trustNetwork) {
|
if end > len(currentTrustNetwork) {
|
||||||
end = len(trustNetwork)
|
end = len(currentTrustNetwork)
|
||||||
}
|
}
|
||||||
|
|
||||||
filters := []nostr.Filter{{
|
filters := []nostr.Filter{{
|
||||||
Authors: trustNetwork[i:end],
|
Authors: currentTrustNetwork[i:end],
|
||||||
Kinds: []int{nostr.KindProfileMetadata},
|
Kinds: []int{nostr.KindProfileMetadata},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
|
for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
|
||||||
wdb.Publish(ctx, *ev.Event)
|
wdb.Publish(ctx, *ev.Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cancel() // Cancel after each iteration
|
||||||
}
|
}
|
||||||
log.Println("👤 profiles refreshed: ", len(trustNetwork))
|
log.Println("👤 profiles refreshed: ", len(currentTrustNetwork))
|
||||||
}
|
}
|
||||||
|
|
||||||
func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
||||||
|
|
||||||
runTrustNetworkRefresh := func() {
|
runTrustNetworkRefresh := func() {
|
||||||
|
// Build new networks in temporary variables to avoid disrupting the active network
|
||||||
|
var newOneHopNetwork []string
|
||||||
|
newOneHopNetworkSet := make(map[string]bool)
|
||||||
|
newPubkeyFollowerCount := make(map[string]int)
|
||||||
|
|
||||||
|
// Copy existing follower counts to preserve data
|
||||||
|
followerMutex.RLock()
|
||||||
|
for k, v := range pubkeyFollowerCount {
|
||||||
|
newPubkeyFollowerCount[k] = v
|
||||||
|
}
|
||||||
|
followerMutex.RUnlock()
|
||||||
|
|
||||||
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -303,30 +404,34 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
|||||||
fmt.Println("ignoring follows from pubkey: ", pubkey)
|
fmt.Println("ignoring follows from pubkey: ", pubkey)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pubkeyFollowerCount[contact[1]]++ // Increment follower count for the pubkey
|
newPubkeyFollowerCount[contact[1]]++
|
||||||
appendOneHopNetwork(contact[1])
|
|
||||||
|
// Add to new one-hop network
|
||||||
|
if !newOneHopNetworkSet[contact[1]] && len(contact[1]) == 64 && len(newOneHopNetwork) < config.MaxOneHopNetwork {
|
||||||
|
newOneHopNetwork = append(newOneHopNetwork, contact[1])
|
||||||
|
newOneHopNetworkSet[contact[1]] = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("🌐 building web of trust graph")
|
log.Println("🌐 building web of trust graph")
|
||||||
for i := 0; i < len(oneHopNetwork); i += 100 {
|
for i := 0; i < len(newOneHopNetwork); i += 100 {
|
||||||
timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
|
timeout, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
end := i + 100
|
end := i + 100
|
||||||
if end > len(oneHopNetwork) {
|
if end > len(newOneHopNetwork) {
|
||||||
end = len(oneHopNetwork)
|
end = len(newOneHopNetwork)
|
||||||
}
|
}
|
||||||
|
|
||||||
filters = []nostr.Filter{{
|
filters = []nostr.Filter{{
|
||||||
Authors: oneHopNetwork[i:end],
|
Authors: newOneHopNetwork[i:end],
|
||||||
Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata},
|
Kinds: []int{nostr.KindFollowList, nostr.KindRelayListMetadata, nostr.KindProfileMetadata},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
|
for ev := range pool.SubManyEose(timeout, seedRelays, filters) {
|
||||||
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
|
for _, contact := range ev.Event.Tags.GetAll([]string{"p"}) {
|
||||||
if len(contact) > 1 {
|
if len(contact) > 1 {
|
||||||
pubkeyFollowerCount[contact[1]]++ // Increment follower count for the pubkey
|
newPubkeyFollowerCount[contact[1]]++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -338,34 +443,82 @@ func refreshTrustNetwork(ctx context.Context, relay *khatru.Relay) {
|
|||||||
wdb.Publish(ctx, *ev.Event)
|
wdb.Publish(ctx, *ev.Event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cancel() // Cancel after each iteration
|
||||||
}
|
}
|
||||||
log.Println("🫂 total network size:", len(pubkeyFollowerCount))
|
|
||||||
|
// Now atomically replace the active data structures
|
||||||
|
oneHopMutex.Lock()
|
||||||
|
oneHopNetwork = newOneHopNetwork
|
||||||
|
oneHopNetworkSet = newOneHopNetworkSet
|
||||||
|
oneHopMutex.Unlock()
|
||||||
|
|
||||||
|
followerMutex.Lock()
|
||||||
|
pubkeyFollowerCount = newPubkeyFollowerCount
|
||||||
|
followerMutex.Unlock()
|
||||||
|
|
||||||
|
log.Println("🫂 total network size:", len(newPubkeyFollowerCount))
|
||||||
|
relayMutex.RLock()
|
||||||
log.Println("🔗 relays discovered:", len(relays))
|
log.Println("🔗 relays discovered:", len(relays))
|
||||||
|
relayMutex.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(time.Duration(config.RefreshInterval) * time.Hour)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Run initial refresh
|
||||||
|
log.Println("🚀 performing initial trust network build...")
|
||||||
|
runTrustNetworkRefresh()
|
||||||
|
updateTrustNetworkFilter()
|
||||||
|
|
||||||
|
// Mark as booted after initial trust network is built
|
||||||
|
booted = true
|
||||||
|
log.Println("✅ trust network initialized, relay is now active")
|
||||||
|
|
||||||
|
deleteOldNotes(relay)
|
||||||
|
archiveTrustedNotes(ctx, relay)
|
||||||
|
|
||||||
|
// Then run on timer
|
||||||
for {
|
for {
|
||||||
runTrustNetworkRefresh()
|
select {
|
||||||
updateTrustNetworkFilter()
|
case <-ticker.C:
|
||||||
deleteOldNotes(relay)
|
log.Println("🔄 refreshing trust network in background...")
|
||||||
archiveTrustedNotes(ctx, relay)
|
runTrustNetworkRefresh()
|
||||||
|
updateTrustNetworkFilter()
|
||||||
|
deleteOldNotes(relay)
|
||||||
|
archiveTrustedNotes(ctx, relay)
|
||||||
|
log.Println("✅ trust network refresh completed")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendRelay(relay string) {
|
func appendRelay(relay string) {
|
||||||
|
relayMutex.Lock()
|
||||||
|
defer relayMutex.Unlock()
|
||||||
|
|
||||||
for _, r := range relays {
|
if len(relays) >= config.MaxRelays {
|
||||||
if r == relay {
|
return // Prevent unbounded growth
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if relaySet[relay] {
|
||||||
|
return // Already exists
|
||||||
|
}
|
||||||
|
|
||||||
relays = append(relays, relay)
|
relays = append(relays, relay)
|
||||||
|
relaySet[relay] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendPubkey(pubkey string) {
|
func appendPubkey(pubkey string) {
|
||||||
for _, pk := range trustNetwork {
|
trustNetworkMutex.Lock()
|
||||||
if pk == pubkey {
|
defer trustNetworkMutex.Unlock()
|
||||||
return
|
|
||||||
}
|
if len(trustNetwork) >= config.MaxTrustNetwork {
|
||||||
|
return // Prevent unbounded growth
|
||||||
|
}
|
||||||
|
|
||||||
|
if trustNetworkSet[pubkey] {
|
||||||
|
return // Already exists
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(pubkey) != 64 {
|
if len(pubkey) != 64 {
|
||||||
@ -373,20 +526,7 @@ func appendPubkey(pubkey string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
trustNetwork = append(trustNetwork, pubkey)
|
trustNetwork = append(trustNetwork, pubkey)
|
||||||
}
|
trustNetworkSet[pubkey] = true
|
||||||
|
|
||||||
func appendOneHopNetwork(pubkey string) {
|
|
||||||
for _, pk := range oneHopNetwork {
|
|
||||||
if pk == pubkey {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(pubkey) != 64 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
oneHopNetwork = append(oneHopNetwork, pubkey)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
||||||
@ -396,12 +536,12 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
|||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer close(done)
|
||||||
if config.ArchivalSync {
|
if config.ArchivalSync {
|
||||||
go refreshProfiles(ctx)
|
go refreshProfiles(ctx)
|
||||||
|
|
||||||
var filters []nostr.Filter
|
var filters []nostr.Filter
|
||||||
if config.ArchiveReactions {
|
if config.ArchiveReactions {
|
||||||
|
|
||||||
filters = []nostr.Filter{{
|
filters = []nostr.Filter{{
|
||||||
Kinds: []int{
|
Kinds: []int{
|
||||||
nostr.KindArticle,
|
nostr.KindArticle,
|
||||||
@ -437,7 +577,16 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
|||||||
log.Println("📦 archiving trusted notes...")
|
log.Println("📦 archiving trusted notes...")
|
||||||
|
|
||||||
for ev := range pool.SubMany(timeout, seedRelays, filters) {
|
for ev := range pool.SubMany(timeout, seedRelays, filters) {
|
||||||
go archiveEvent(ctx, relay, *ev.Event)
|
// Use semaphore to limit concurrent goroutines
|
||||||
|
select {
|
||||||
|
case archiveEventSemaphore <- struct{}{}:
|
||||||
|
go func(event nostr.Event) {
|
||||||
|
defer func() { <-archiveEventSemaphore }()
|
||||||
|
archiveEvent(ctx, relay, event)
|
||||||
|
}(*ev.Event)
|
||||||
|
case <-timeout.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes")
|
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes")
|
||||||
@ -447,8 +596,6 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
|||||||
case <-timeout.Done():
|
case <-timeout.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(done)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -460,7 +607,11 @@ func archiveTrustedNotes(ctx context.Context, relay *khatru.Relay) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) {
|
func archiveEvent(ctx context.Context, relay *khatru.Relay, ev nostr.Event) {
|
||||||
if trustNetworkMap[ev.PubKey] {
|
trustNetworkMutex.RLock()
|
||||||
|
trusted := trustNetworkMap[ev.PubKey]
|
||||||
|
trustNetworkMutex.RUnlock()
|
||||||
|
|
||||||
|
if trusted {
|
||||||
wdb.Publish(ctx, ev)
|
wdb.Publish(ctx, ev)
|
||||||
relay.BroadcastEvent(&ev)
|
relay.BroadcastEvent(&ev)
|
||||||
trustedNotes++
|
trustedNotes++
|
||||||
@ -499,6 +650,7 @@ func deleteOldNotes(relay *khatru.Relay) error {
|
|||||||
nostr.KindZap,
|
nostr.KindZap,
|
||||||
nostr.KindTextNote,
|
nostr.KindTextNote,
|
||||||
},
|
},
|
||||||
|
Limit: 1000, // Process in batches to avoid memory issues
|
||||||
}
|
}
|
||||||
|
|
||||||
ch, err := relay.QueryEvents[0](ctx, filter)
|
ch, err := relay.QueryEvents[0](ctx, filter)
|
||||||
@ -507,27 +659,47 @@ func deleteOldNotes(relay *khatru.Relay) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
events := make([]*nostr.Event, 0)
|
// Process events in batches to avoid memory issues
|
||||||
|
batchSize := 100
|
||||||
|
events := make([]*nostr.Event, 0, batchSize)
|
||||||
|
count := 0
|
||||||
|
|
||||||
for evt := range ch {
|
for evt := range ch {
|
||||||
events = append(events, evt)
|
events = append(events, evt)
|
||||||
|
count++
|
||||||
|
|
||||||
|
if len(events) >= batchSize {
|
||||||
|
// Delete this batch
|
||||||
|
for num_evt, del_evt := range events {
|
||||||
|
for _, del := range relay.DeleteEvent {
|
||||||
|
if err := del(ctx, del_evt); err != nil {
|
||||||
|
log.Printf("error deleting note %d of batch. event id: %s", num_evt, del_evt.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
events = events[:0] // Reset slice but keep capacity
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(events) < 1 {
|
// Delete remaining events
|
||||||
log.Println("0 old notes found")
|
if len(events) > 0 {
|
||||||
return nil
|
for num_evt, del_evt := range events {
|
||||||
}
|
for _, del := range relay.DeleteEvent {
|
||||||
|
if err := del(ctx, del_evt); err != nil {
|
||||||
for num_evt, del_evt := range events {
|
log.Printf("error deleting note %d of final batch. event id: %s", num_evt, del_evt.ID)
|
||||||
for _, del := range relay.DeleteEvent {
|
return err
|
||||||
if err := del(ctx, del_evt); err != nil {
|
}
|
||||||
log.Printf("error deleting note %d of %d. event id: %s", num_evt, len(events), del_evt.ID)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("%d old (until %d) notes deleted", len(events), oldAge)
|
if count == 0 {
|
||||||
|
log.Println("0 old notes found")
|
||||||
|
} else {
|
||||||
|
log.Printf("%d old (until %d) notes deleted", count, oldAge)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -553,3 +725,38 @@ func isIgnored(pubkey string, ignoredPubkeys []string) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add memory monitoring
|
||||||
|
func monitorMemoryUsage() {
|
||||||
|
ticker := time.NewTicker(5 * time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
|
||||||
|
relayMutex.RLock()
|
||||||
|
relayCount := len(relays)
|
||||||
|
relayMutex.RUnlock()
|
||||||
|
|
||||||
|
trustNetworkMutex.RLock()
|
||||||
|
trustNetworkCount := len(trustNetwork)
|
||||||
|
trustNetworkMutex.RUnlock()
|
||||||
|
|
||||||
|
oneHopMutex.RLock()
|
||||||
|
oneHopCount := len(oneHopNetwork)
|
||||||
|
oneHopMutex.RUnlock()
|
||||||
|
|
||||||
|
followerMutex.RLock()
|
||||||
|
followerCount := len(pubkeyFollowerCount)
|
||||||
|
followerMutex.RUnlock()
|
||||||
|
|
||||||
|
log.Printf("📊 Memory: Alloc=%d KB, Sys=%d KB, NumGC=%d",
|
||||||
|
m.Alloc/1024, m.Sys/1024, m.NumGC)
|
||||||
|
log.Printf("📊 Data structures: Relays=%d, TrustNetwork=%d, OneHop=%d, Followers=%d",
|
||||||
|
relayCount, trustNetworkCount, oneHopCount, followerCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user