diff --git a/go.mod b/go.mod index bbf0ae4..2452abf 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.1 require ( fiatjaf.com/lib v0.3.1 - fiatjaf.com/nostr v0.0.0-20251104112613-38a6ca92b954 + fiatjaf.com/nostr v0.0.0-20251112024900-1c43f0d66643 github.com/bep/debounce v1.2.1 github.com/btcsuite/btcd/btcec/v2 v2.3.6 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e diff --git a/go.sum b/go.sum index 556b0db..0b64989 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ fiatjaf.com/lib v0.3.1 h1:/oFQwNtFRfV+ukmOCxfBEAuayoLwXp4wu2/fz5iHpwA= fiatjaf.com/lib v0.3.1/go.mod h1:Ycqq3+mJ9jAWu7XjbQI1cVr+OFgnHn79dQR5oTII47g= -fiatjaf.com/nostr v0.0.0-20251104112613-38a6ca92b954 h1:CMD8D3TgEjGhuIBNMnvZ0EXOW0JR9O3w8AI6Yuzt8Ec= -fiatjaf.com/nostr v0.0.0-20251104112613-38a6ca92b954/go.mod h1:Nq86Jjsd0OmsOEImUg0iCcLuqM5B67Nj2eu/2dP74Ss= +fiatjaf.com/nostr v0.0.0-20251112024900-1c43f0d66643 h1:GcoAN1FQV+rayCIklvj+mIB/ZR3Oni98C3bS/M+vzts= +fiatjaf.com/nostr v0.0.0-20251112024900-1c43f0d66643/go.mod h1:Nq86Jjsd0OmsOEImUg0iCcLuqM5B67Nj2eu/2dP74Ss= github.com/FastFilter/xorfilter v0.2.1 h1:lbdeLG9BdpquK64ZsleBS8B4xO/QW1IM0gMzF7KaBKc= github.com/FastFilter/xorfilter v0.2.1/go.mod h1:aumvdkhscz6YBZF9ZA/6O4fIoNod4YR50kIVGGZ7l9I= github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg= diff --git a/req.go b/req.go index 2725054..7b28049 100644 --- a/req.go +++ b/req.go @@ -1,14 +1,19 @@ package main import ( + "bufio" "context" "fmt" + "math" "os" "slices" "strings" "sync" "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore" + "fiatjaf.com/nostr/eventstore/slicestore" + "fiatjaf.com/nostr/eventstore/wrappers" "fiatjaf.com/nostr/nip42" "fiatjaf.com/nostr/nip77" "github.com/fatih/color" @@ -38,6 +43,11 @@ example: DisableSliceFlagSeparator: true, Flags: append(defaultKeyFlags, append(reqFilterFlags, + &cli.StringFlag{ + Name: "only-missing", + Usage: "use nip77 negentropy to only fetch events that aren't present in the given jsonl file", + TakesFile: true, + }, &cli.BoolFlag{ Name: "ids-only", Usage: "use nip77 to fetch just a list of ids", @@ -53,7 +63,7 @@ example: DefaultText: "false, will only use manually-specified relays", }, &cli.UintFlag{ - Name: "outbox-relays-number", + Name: "outbox-relays-per-pubkey", Aliases: []string{"n"}, Usage: "number of outbox relays to use for each pubkey", Value: 3, @@ -90,6 +100,13 @@ example: ), ArgsUsage: "[relay...]", Action: func(ctx context.Context, c *cli.Command) error { + negentropy := c.Bool("ids-only") || c.IsSet("only-missing") + if negentropy { + if c.Bool("paginate") || c.Bool("stream") || c.Bool("outbox") { + return fmt.Errorf("negentropy is incompatible with --stream, --outbox or --paginate") + } + } + if c.Bool("paginate") && c.Bool("stream") { return fmt.Errorf("incompatible flags --paginate and --stream") } @@ -99,7 +116,7 @@ example: } relayUrls := c.Args().Slice() - if len(relayUrls) > 0 { + if len(relayUrls) > 0 && !negentropy { // this is used both for the normal AUTH (after "auth-required:" is received) or forced pre-auth // connect to all relays we expect to use in this call in parallel forcePreAuthSigner := authSigner @@ -152,20 +169,60 @@ example: } if len(relayUrls) > 0 || c.Bool("outbox") { - if c.Bool("ids-only") { - seen := make(map[nostr.ID]struct{}, max(500, filter.Limit)) - for _, url := range relayUrls { - ch, err := nip77.FetchIDsOnly(ctx, url, filter) + if negentropy { + store := &slicestore.SliceStore{} + store.Init() + + if syncFile := c.String("only-missing"); syncFile != "" { + file, err := os.Open(syncFile) if err != nil { - log("negentropy call to %s failed: %s", url, err) - continue + return fmt.Errorf("failed to open sync file: %w", err) } - for id := range ch { - if _, ok := seen[id]; ok { + defer file.Close() + scanner := bufio.NewScanner(file) + scanner.Buffer(make([]byte, 16*1024*1024), 256*1024*1024) + for scanner.Scan() { + var evt nostr.Event + if err := easyjson.Unmarshal([]byte(scanner.Text()), &evt); err != nil { continue } - seen[id] = struct{}{} - stdout(id) + if err := store.SaveEvent(evt); err != nil || err == eventstore.ErrDupEvent { + continue + } + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("failed to read sync file: %w", err) + } + } + + target := PrintingQuerierPublisher{ + QuerierPublisher: wrappers.StorePublisher{Store: store, MaxLimit: math.MaxInt}, + } + + var source nostr.Querier = nil + if c.IsSet("only-missing") { + source = target + } + + handle := nip77.SyncEventsFromIDs + + if c.Bool("ids-only") { + seen := make(map[nostr.ID]struct{}, max(500, filter.Limit)) + handle = func(ctx context.Context, dir nip77.Direction) { + for id := range dir.Items { + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} + stdout(id.Hex()) + } + } + } + + for _, url := range relayUrls { + err := nip77.NegentropySync(ctx, url, filter, source, target, handle) + if err != nil { + log("negentropy sync from %s failed: %s", url, err) } } } else { @@ -194,7 +251,7 @@ example: mu := sync.Mutex{} for _, pubkey := range filter.Authors { errg.Go(func() error { - n := int(c.Uint("outbox-relays-number")) + n := int(c.Uint("outbox-relays-per-pubkey")) for _, url := range sys.FetchOutboxRelays(ctx, pubkey, n) { if slices.Contains(relayUrls, url) { // already hardcoded, ignore @@ -395,3 +452,18 @@ func applyFlagsToFilter(c *cli.Command, filter *nostr.Filter) error { return nil } + +type PrintingQuerierPublisher struct { + nostr.QuerierPublisher +} + +func (p PrintingQuerierPublisher) Publish(ctx context.Context, evt nostr.Event) error { + if err := p.QuerierPublisher.Publish(ctx, evt); err == nil { + stdout(evt) + return nil + } else if err == eventstore.ErrDupEvent { + return nil + } else { + return err + } +}