broadcast events, process in goroutine

This commit is contained in:
Barry Deen 2024-09-12 20:33:53 -04:00
parent e99c5dd753
commit 3c6f93cb33

35
main.go
View File

@ -38,6 +38,8 @@ var booted bool
var oneHopNetwork []string var oneHopNetwork []string
var trustNetworkMap map[string]bool var trustNetworkMap map[string]bool
var pubkeyFollowerCount = make(map[string]int) var pubkeyFollowerCount = make(map[string]int)
var trustedNotes uint64
var untrustedNotes uint64
func main() { func main() {
nostr.InfoLogger = log.New(io.Discard, "", 0) nostr.InfoLogger = log.New(io.Discard, "", 0)
@ -78,17 +80,16 @@ func main() {
relay.RejectEvent = append(relay.RejectEvent, relay.RejectEvent = append(relay.RejectEvent,
policies.RejectEventsWithBase64Media, policies.RejectEventsWithBase64Media,
policies.EventIPRateLimiter(5, time.Minute*2, 30), policies.EventIPRateLimiter(5, time.Minute*1, 30),
) )
relay.RejectFilter = append(relay.RejectFilter, relay.RejectFilter = append(relay.RejectFilter,
policies.NoEmptyFilters, policies.NoEmptyFilters,
policies.NoComplexFilters, policies.NoComplexFilters,
// policies.FilterIPRateLimiter(50, time.Minute, 250),
) )
relay.RejectConnection = append(relay.RejectConnection, relay.RejectConnection = append(relay.RejectConnection,
policies.ConnectionRateLimiter(10, time.Minute*1, 30), policies.ConnectionRateLimiter(10, time.Minute*2, 30),
) )
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent) relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
@ -349,8 +350,6 @@ func archiveTrustedNotes(relay *khatru.Relay, ctx context.Context) {
}} }}
log.Println("📦 archiving trusted notes...") log.Println("📦 archiving trusted notes...")
var trustedNotes uint64
var untrustedNotes uint64
eventChan := pool.SubMany(ctx, seedRelays, filters) eventChan := pool.SubMany(ctx, seedRelays, filters)
@ -359,31 +358,43 @@ func archiveTrustedNotes(relay *khatru.Relay, ctx context.Context) {
case <-timeout: case <-timeout:
log.Println("⏰ Archive process terminated due to timeout") log.Println("⏰ Archive process terminated due to timeout")
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes") log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes")
trustedNotes = 0
untrustedNotes = 0
return return
case <-ctx.Done(): case <-ctx.Done():
log.Println("⏰ Archive process terminated due to context cancellation") log.Println("⏰ Archive process terminated due to context cancellation")
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes") log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes")
trustedNotes = 0
untrustedNotes = 0
return return
case ev, ok := <-eventChan: case ev, ok := <-eventChan:
if !ok { if !ok {
log.Println("📦 subscription channel closed") log.Println("📦 subscription channel closed")
log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes") log.Println("📦 archived", trustedNotes, "trusted notes and discarded", untrustedNotes, "untrusted notes")
trustedNotes = 0
untrustedNotes = 0
return return
} }
if trustNetworkMap[ev.Event.PubKey] { go processEvent(ctx, ev.Event, relay)
if len(ev.Event.Tags) > 3000 { }
continue }
} }
relay.AddEvent(ctx, ev.Event) func processEvent(ctx context.Context, ev *nostr.Event, relay *khatru.Relay) {
if trustNetworkMap[ev.PubKey] {
if len(ev.Tags) > 3000 {
return
}
relay.AddEvent(ctx, ev)
relay.BroadcastEvent(ev)
trustedNotes++ trustedNotes++
//log.Println("📦 archived note: ", ev.Event.ID) log.Println("📦 archived note: ", ev.ID)
} else { } else {
log.Println("📦 discarded note: ", ev.ID)
untrustedNotes++ untrustedNotes++
} }
} }
}
}