From 925170246054f3eb02d5fa148e88cd027bc0e0b3 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 6 Sep 2025 22:20:50 -0300 Subject: [PATCH] query batching on nak req --outbox. --- go.mod | 2 +- req.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 5725256..2131920 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/urfave/cli/v3 v3.0.0-beta1 golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b + golang.org/x/sync v0.16.0 golang.org/x/term v0.32.0 ) @@ -70,7 +71,6 @@ require ( go.etcd.io/bbolt v1.4.2 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/net v0.41.0 // indirect - golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.26.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/req.go b/req.go index 0cd6629..bd410a4 100644 --- a/req.go +++ b/req.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "os" + "slices" "strings" + "sync" "fiatjaf.com/nostr" "fiatjaf.com/nostr/nip42" @@ -12,6 +14,7 @@ import ( "github.com/fatih/color" "github.com/mailru/easyjson" "github.com/urfave/cli/v3" + "golang.org/x/sync/errgroup" ) const ( @@ -186,17 +189,52 @@ example: } // relays for each pubkey + errg := errgroup.Group{} + errg.SetLimit(16) + mu := sync.Mutex{} for _, pubkey := range filter.Authors { - n := int(c.Uint("outbox-relays-number")) - this := filter.Clone() - this.Authors = []nostr.PubKey{pubkey} - for _, url := range sys.FetchOutboxRelays(ctx, pubkey, n) { - defs = append(defs, nostr.DirectedFilter{ - Filter: this, - Relay: url, - }) - } + errg.Go(func() error { + n := int(c.Uint("outbox-relays-number")) + for _, url := range sys.FetchOutboxRelays(ctx, pubkey, n) { + if slices.Contains(relayUrls, url) { + // already hardcoded, ignore + continue + } + if !nostr.IsValidRelayURL(url) { + continue + } + + matchUrl := func(def nostr.DirectedFilter) bool { return def.Relay == url } + idx := slices.IndexFunc(defs, matchUrl) + if idx == -1 { + // new relay, add it + mu.Lock() + // check again after locking to prevent races + idx = slices.IndexFunc(defs, matchUrl) + if idx == -1 { + // then add it + filter := filter.Clone() + filter.Authors = []nostr.PubKey{pubkey} + defs = append(defs, nostr.DirectedFilter{ + Filter: filter, + Relay: url, + }) + mu.Unlock() + continue // done with this relay url + } + + // otherwise we'll just use the idx + mu.Unlock() + } + + // existing relay, add this pubkey + defs[idx].Authors = append(defs[idx].Authors, pubkey) + } + + return nil + }) } + errg.Wait() if c.Bool("stream") { results = sys.Pool.BatchedSubscribeMany(ctx, defs, opts)