query batching on nak req --outbox.

This commit is contained in:
fiatjaf 2025-09-06 22:20:50 -03:00
parent 13452e6916
commit 9251702460
2 changed files with 48 additions and 10 deletions

2
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/urfave/cli/v3 v3.0.0-beta1
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b
golang.org/x/sync v0.16.0
golang.org/x/term v0.32.0
)
@ -70,7 +71,6 @@ require (
go.etcd.io/bbolt v1.4.2 // indirect
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

56
req.go
View File

@ -4,7 +4,9 @@ import (
"context"
"fmt"
"os"
"slices"
"strings"
"sync"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/nip42"
@ -12,6 +14,7 @@ import (
"github.com/fatih/color"
"github.com/mailru/easyjson"
"github.com/urfave/cli/v3"
"golang.org/x/sync/errgroup"
)
const (
@ -186,17 +189,52 @@ example:
}
// relays for each pubkey
errg := errgroup.Group{}
errg.SetLimit(16)
mu := sync.Mutex{}
for _, pubkey := range filter.Authors {
n := int(c.Uint("outbox-relays-number"))
this := filter.Clone()
this.Authors = []nostr.PubKey{pubkey}
for _, url := range sys.FetchOutboxRelays(ctx, pubkey, n) {
defs = append(defs, nostr.DirectedFilter{
Filter: this,
Relay: url,
})
}
errg.Go(func() error {
n := int(c.Uint("outbox-relays-number"))
for _, url := range sys.FetchOutboxRelays(ctx, pubkey, n) {
if slices.Contains(relayUrls, url) {
// already hardcoded, ignore
continue
}
if !nostr.IsValidRelayURL(url) {
continue
}
matchUrl := func(def nostr.DirectedFilter) bool { return def.Relay == url }
idx := slices.IndexFunc(defs, matchUrl)
if idx == -1 {
// new relay, add it
mu.Lock()
// check again after locking to prevent races
idx = slices.IndexFunc(defs, matchUrl)
if idx == -1 {
// then add it
filter := filter.Clone()
filter.Authors = []nostr.PubKey{pubkey}
defs = append(defs, nostr.DirectedFilter{
Filter: filter,
Relay: url,
})
mu.Unlock()
continue // done with this relay url
}
// otherwise we'll just use the idx
mu.Unlock()
}
// existing relay, add this pubkey
defs[idx].Authors = append(defs[idx].Authors, pubkey)
}
return nil
})
}
errg.Wait()
if c.Bool("stream") {
results = sys.Pool.BatchedSubscribeMany(ctx, defs, opts)