diff --git a/go.mod b/go.mod index 8fa58af..9ff63dd 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,11 @@ go 1.25 require ( fiatjaf.com/lib v0.3.1 - fiatjaf.com/nostr v0.0.0-20251126101225-44130595c606 + fiatjaf.com/nostr v0.0.0-20251201232830-91548fa0a157 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/bep/debounce v1.2.1 github.com/btcsuite/btcd/btcec/v2 v2.3.6 + github.com/charmbracelet/glamour v0.10.0 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 github.com/fatih/color v1.16.0 @@ -41,7 +42,6 @@ require ( github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect - github.com/charmbracelet/glamour v0.10.0 // indirect github.com/charmbracelet/lipgloss v1.1.1-0.20250404203927-76690c660834 // indirect github.com/charmbracelet/x/ansi v0.8.0 // indirect github.com/charmbracelet/x/cellbuf v0.0.13 // indirect diff --git a/go.sum b/go.sum index 875f706..b1a3a95 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ fiatjaf.com/lib v0.3.1 h1:/oFQwNtFRfV+ukmOCxfBEAuayoLwXp4wu2/fz5iHpwA= fiatjaf.com/lib v0.3.1/go.mod h1:Ycqq3+mJ9jAWu7XjbQI1cVr+OFgnHn79dQR5oTII47g= -fiatjaf.com/nostr v0.0.0-20251126101225-44130595c606 h1:wQHJ0TFA0Fuq92p/6u6AbsBFq6ZVToSdxV6puXVIruI= -fiatjaf.com/nostr v0.0.0-20251126101225-44130595c606/go.mod h1:ue7yw0zHfZj23Ml2kVSdBx0ENEaZiuvGxs/8VEN93FU= +fiatjaf.com/nostr v0.0.0-20251201232830-91548fa0a157 h1:14yLsO2HwpS2CLIKFvLMDp8tVEDahwdC8OeG6NGaL+M= +fiatjaf.com/nostr v0.0.0-20251201232830-91548fa0a157/go.mod h1:ue7yw0zHfZj23Ml2kVSdBx0ENEaZiuvGxs/8VEN93FU= github.com/AlecAivazis/survey/v2 v2.3.7 h1:6I/u8FvytdGsgonrYsVn2t8t4QiRnh6QSTqkkhIiSjQ= github.com/AlecAivazis/survey/v2 v2.3.7/go.mod h1:xUTIdE4KCOIjsBAE1JYsUPoCqYdZ1reCfTwbto0Fduo= github.com/FastFilter/xorfilter v0.2.1 h1:lbdeLG9BdpquK64ZsleBS8B4xO/QW1IM0gMzF7KaBKc= @@ -13,12 +13,18 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDe github.com/PowerDNS/lmdb-go v1.9.3 h1:AUMY2pZT8WRpkEv39I9Id3MuoHd+NZbTVpNhruVkPTg= github.com/PowerDNS/lmdb-go v1.9.3/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/alecthomas/assert/v2 v2.7.0 h1:QtqSACNS3tF7oasA8CU6A6sXZSBDqnm7RfpLl9bZqbE= +github.com/alecthomas/assert/v2 v2.7.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/chroma/v2 v2.14.0 h1:R3+wzpnUArGcQz7fCETQBzO5n9IMNi13iIs46aU4V9E= github.com/alecthomas/chroma/v2 v2.14.0/go.mod h1:QolEbTfmUHIMVpBqxeDnNBj2uoeI4EbYP4i6n68SG4I= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/aymanbagabas/go-udiff v0.2.0 h1:TK0fH4MteXUDspT88n8CKzvK0X9O2xu9yQjWpi6yML8= +github.com/aymanbagabas/go-udiff v0.2.0/go.mod h1:RE4Ex0qsGkTAJoQdQQCA0uG+nAzJO/pI/QwceO5fgrA= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= @@ -65,6 +71,8 @@ github.com/charmbracelet/x/ansi v0.8.0 h1:9GTq3xq9caJW8ZrBTe0LIe2fvfLR/bYXKTx2ll github.com/charmbracelet/x/ansi v0.8.0/go.mod h1:wdYl/ONOLHLIVmQaxbIYEC/cRKOQyjTkowiI4blgS9Q= github.com/charmbracelet/x/cellbuf v0.0.13 h1:/KBBKHuVRbq1lYx5BzEHBAFBP8VcQzJejZ/IA3iR28k= github.com/charmbracelet/x/cellbuf v0.0.13/go.mod h1:xe0nKWGd3eJgtqZRaN9RjMtK7xUYchjzPr7q6kcvCCs= +github.com/charmbracelet/x/exp/golden v0.0.0-20240806155701-69247e0abc2a h1:G99klV19u0QnhiizODirwVksQB91TJKV/UaTnACcG30= +github.com/charmbracelet/x/exp/golden v0.0.0-20240806155701-69247e0abc2a/go.mod h1:wDlXFlCrmJ8J+swcL/MnGUuYnqgQdW9rhSD61oNMb6U= github.com/charmbracelet/x/exp/slice v0.0.0-20250327172914-2fdc97757edf h1:rLG0Yb6MQSDKdB52aGX55JT1oi0P0Kuaj7wi1bLUpnI= github.com/charmbracelet/x/exp/slice v0.0.0-20250327172914-2fdc97757edf/go.mod h1:B3UgsnsBZS/eX42BlaNiJkD1pPOUa+oF1IYC6Yd2CEU= github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQaGIAQ= @@ -138,6 +146,8 @@ github.com/hablullah/go-juliandays v1.0.0 h1:A8YM7wIj16SzlKT0SRJc9CD29iiaUzpBLzh github.com/hablullah/go-juliandays v1.0.0/go.mod h1:0JOYq4oFOuDja+oospuc61YoX+uNEn7Z6uHYTbBzdGc= github.com/hanwen/go-fuse/v2 v2.7.2 h1:SbJP1sUP+n1UF8NXBA14BuojmTez+mDgOk0bC057HQw= github.com/hanwen/go-fuse/v2 v2.7.2/go.mod h1:ugNaD/iv5JYyS1Rcvi57Wz7/vrLQJo10mmketmoef48= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u7lxST/RaJw+cv273q79D81Xbog= github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/main.go b/main.go index c3438e9..aac465d 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,7 @@ var app = &cli.Command{ publish, git, nip, + syncCmd, }, Version: version, Flags: []cli.Flag{ diff --git a/sync.go b/sync.go new file mode 100644 index 0000000..348d97f --- /dev/null +++ b/sync.go @@ -0,0 +1,464 @@ +package main + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync" + + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/nip77" + "fiatjaf.com/nostr/nip77/negentropy" + "fiatjaf.com/nostr/nip77/negentropy/storage" + "github.com/urfave/cli/v3" +) + +var syncCmd = &cli.Command{ + Name: "sync", + Usage: "sync events between two relays using negentropy", + Description: `uses nip77 negentropy to sync events between two relays`, + ArgsUsage: " ", + Flags: reqFilterFlags, + Action: func(ctx context.Context, c *cli.Command) error { + args := c.Args().Slice() + if len(args) != 2 { + return fmt.Errorf("need exactly two relay URLs: source and target") + } + + filter := nostr.Filter{} + if err := applyFlagsToFilter(c, &filter); err != nil { + return err + } + + peerA, err := NewRelayThirdPartyRemote(ctx, args[0]) + if err != nil { + return fmt.Errorf("error setting up %s: %w", args[0], err) + } + + peerB, err := NewRelayThirdPartyRemote(ctx, args[1]) + if err != nil { + return fmt.Errorf("error setting up %s: %w", args[1], err) + } + + tpn := NewThirdPartyNegentropy( + peerA, + peerB, + filter, + ) + + wg := sync.WaitGroup{} + + wg.Go(func() { + err = tpn.Run(ctx) + }) + + wg.Go(func() { + type op struct { + src *nostr.Relay + dst *nostr.Relay + ids []nostr.ID + } + + pending := []op{ + {peerA.relay, peerB.relay, make([]nostr.ID, 0, 30)}, + {peerB.relay, peerA.relay, make([]nostr.ID, 0, 30)}, + } + + for delta := range tpn.Deltas { + have := delta.Have.relay + havenot := delta.HaveNot.relay + logverbose("%s has %s, %s doesn't.\n", have.URL, delta.ID.Hex(), havenot.URL) + + idx := 0 // peerA + if have == peerB.relay { + idx = 1 // peerB + } + pending[idx].ids = append(pending[idx].ids, delta.ID) + + // every 30 ids do a fetch-and-publish + if len(pending[idx].ids) == 30 { + for evt := range pending[idx].src.QueryEvents(nostr.Filter{IDs: pending[idx].ids}) { + pending[idx].dst.Publish(ctx, evt) + } + pending[idx].ids = pending[idx].ids[:0] + } + } + + // do it for the remaining ids + for _, op := range pending { + if len(op.ids) > 0 { + for evt := range op.src.QueryEvents(nostr.Filter{IDs: op.ids}) { + op.dst.Publish(ctx, evt) + } + } + } + }) + + wg.Wait() + + return err + }, +} + +type ThirdPartyNegentropy struct { + PeerA *RelayThirdPartyRemote + PeerB *RelayThirdPartyRemote + Filter nostr.Filter + + Deltas chan Delta +} + +type Delta struct { + ID nostr.ID + Have *RelayThirdPartyRemote + HaveNot *RelayThirdPartyRemote +} + +type boundKey string + +func getBoundKey(b negentropy.Bound) boundKey { + return boundKey(fmt.Sprintf("%d:%x", b.Timestamp, b.IDPrefix)) +} + +type RelayThirdPartyRemote struct { + relay *nostr.Relay + messages chan string + err error +} + +func NewRelayThirdPartyRemote(ctx context.Context, url string) (*RelayThirdPartyRemote, error) { + rtpr := &RelayThirdPartyRemote{ + messages: make(chan string, 3), + } + + var err error + rtpr.relay, err = nostr.RelayConnect(ctx, url, nostr.RelayOptions{ + CustomHandler: func(data string) { + envelope := nip77.ParseNegMessage(data) + if envelope == nil { + return + } + switch env := envelope.(type) { + case *nip77.OpenEnvelope, *nip77.CloseEnvelope: + rtpr.err = fmt.Errorf("unexpected %s received from relay", env.Label()) + return + case *nip77.ErrorEnvelope: + rtpr.err = fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason) + return + case *nip77.MessageEnvelope: + rtpr.messages <- env.Message + } + }, + }) + if err != nil { + return nil, err + } + + return rtpr, nil +} + +func (rtpr *RelayThirdPartyRemote) SendInitialMessage(filter nostr.Filter, msg string) error { + msgj, _ := json.Marshal(nip77.OpenEnvelope{ + SubscriptionID: "sync3", + Filter: filter, + Message: msg, + }) + return rtpr.relay.WriteWithError(msgj) +} + +func (rtpr *RelayThirdPartyRemote) SendMessage(msg string) error { + msgj, _ := json.Marshal(nip77.MessageEnvelope{ + SubscriptionID: "sync3", + Message: msg, + }) + return rtpr.relay.WriteWithError(msgj) +} + +func (rtpr *RelayThirdPartyRemote) SendClose() error { + msgj, _ := json.Marshal(nip77.CloseEnvelope{ + SubscriptionID: "sync3", + }) + return rtpr.relay.WriteWithError(msgj) +} + +var thirdPartyRemoteEndOfMessages = errors.New("the-end") + +func (rtpr *RelayThirdPartyRemote) Receive() (string, error) { + if rtpr.err != nil { + return "", rtpr.err + } + if msg, ok := <-rtpr.messages; ok { + return msg, nil + } + return "", thirdPartyRemoteEndOfMessages +} + +func NewThirdPartyNegentropy(peerA, peerB *RelayThirdPartyRemote, filter nostr.Filter) *ThirdPartyNegentropy { + return &ThirdPartyNegentropy{ + PeerA: peerA, + PeerB: peerB, + Filter: filter, + Deltas: make(chan Delta, 100), + } +} + +func (n *ThirdPartyNegentropy) Run(ctx context.Context) error { + peerAIds := make(map[nostr.ID]struct{}) + peerBIds := make(map[nostr.ID]struct{}) + peerASkippedBounds := make(map[boundKey]struct{}) + peerBSkippedBounds := make(map[boundKey]struct{}) + + // send an empty message to A to start things up + initialMsg := createInitialMessage() + err := n.PeerA.SendInitialMessage(n.Filter, initialMsg) + if err != nil { + return err + } + + hasSentInitialMessageToB := false + + for { + // receive message from A + msgA, err := n.PeerA.Receive() + if err != nil { + return err + } + msgAb, _ := nostr.HexDecodeString(msgA) + if len(msgAb) == 1 { + break + } + + msgToB, err := parseMessageBuildNext( + msgA, + peerBSkippedBounds, + func(id nostr.ID) { + if _, exists := peerBIds[id]; exists { + delete(peerBIds, id) + } else { + peerAIds[id] = struct{}{} + } + }, + func(boundKey boundKey) { + peerASkippedBounds[boundKey] = struct{}{} + }, + ) + if err != nil { + return err + } + + // emit deltas from B after receiving message from A + for id := range peerBIds { + select { + case n.Deltas <- Delta{ID: id, Have: n.PeerB, HaveNot: n.PeerA}: + case <-ctx.Done(): + return context.Cause(ctx) + } + delete(peerBIds, id) + } + + if len(msgToB) == 2 { + // exit condition (no more messages to send) + break + } + + // send message to B + if hasSentInitialMessageToB { + err = n.PeerB.SendMessage(msgToB) + } else { + err = n.PeerB.SendInitialMessage(n.Filter, msgToB) + hasSentInitialMessageToB = true + } + if err != nil { + return err + } + + // receive message from B + msgB, err := n.PeerB.Receive() + if err != nil { + return err + } + msgBb, _ := nostr.HexDecodeString(msgB) + if len(msgBb) == 1 { + break + } + + msgToA, err := parseMessageBuildNext( + msgB, + peerASkippedBounds, + func(id nostr.ID) { + if _, exists := peerAIds[id]; exists { + delete(peerAIds, id) + } else { + peerBIds[id] = struct{}{} + } + }, + func(boundKey boundKey) { + peerBSkippedBounds[boundKey] = struct{}{} + }, + ) + if err != nil { + return err + } + + // emit deltas from A after receiving message from B + for id := range peerAIds { + select { + case n.Deltas <- Delta{ID: id, Have: n.PeerA, HaveNot: n.PeerB}: + case <-ctx.Done(): + return context.Cause(ctx) + } + delete(peerAIds, id) + } + + if len(msgToA) == 2 { + // exit condition (no more messages to send) + break + } + + // send message to A + err = n.PeerA.SendMessage(msgToA) + if err != nil { + return err + } + } + + // emit remaining deltas before exit + for id := range peerAIds { + select { + case n.Deltas <- Delta{ID: id, Have: n.PeerA, HaveNot: n.PeerB}: + case <-ctx.Done(): + return context.Cause(ctx) + } + } + for id := range peerBIds { + select { + case n.Deltas <- Delta{ID: id, Have: n.PeerB, HaveNot: n.PeerA}: + case <-ctx.Done(): + return context.Cause(ctx) + } + } + + n.PeerA.SendClose() + n.PeerB.SendClose() + close(n.Deltas) + + return nil +} + +func createInitialMessage() string { + output := bytes.NewBuffer(make([]byte, 0, 64)) + output.WriteByte(negentropy.ProtocolVersion) + + dummy := negentropy.BoundWriter{} + dummy.WriteBound(output, negentropy.InfiniteBound) + output.WriteByte(byte(negentropy.FingerprintMode)) + + // hardcoded random fingerprint + fingerprint := [negentropy.FingerprintSize]byte{ + 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, + 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, + } + output.Write(fingerprint[:]) + + return nostr.HexEncodeToString(output.Bytes()) +} + +func parseMessageBuildNext( + msg string, + skippedBounds map[boundKey]struct{}, + idCallback func(id nostr.ID), + skipCallback func(boundKey boundKey), +) (string, error) { + msgb, err := nostr.HexDecodeString(msg) + if err != nil { + return "", err + } + + br := &negentropy.BoundReader{} + bw := &negentropy.BoundWriter{} + + nextMsg := bytes.NewBuffer(make([]byte, 0, len(msgb))) + acc := &storage.Accumulator{} // this will be used for building our own fingerprints and also as a placeholder + + reader := bytes.NewReader(msgb) + pv, err := reader.ReadByte() + if err != nil { + return "", err + } + if pv != negentropy.ProtocolVersion { + return "", fmt.Errorf("unsupported protocol version %v", pv) + } + + nextMsg.WriteByte(pv) + + for reader.Len() > 0 { + bound, err := br.ReadBound(reader) + if err != nil { + return "", err + } + + modeVal, err := negentropy.ReadVarInt(reader) + if err != nil { + return "", err + } + mode := negentropy.Mode(modeVal) + + switch mode { + case negentropy.SkipMode: + skipCallback(getBoundKey(bound)) + if _, skipped := skippedBounds[getBoundKey(bound)]; !skipped { + bw.WriteBound(nextMsg, bound) + negentropy.WriteVarInt(nextMsg, int(negentropy.SkipMode)) + } + + case negentropy.FingerprintMode: + _, err = reader.Read(acc.Buf[0:negentropy.FingerprintSize] /* use this buffer as a dummy */) + if err != nil { + return "", err + } + + if _, skipped := skippedBounds[getBoundKey(bound)]; !skipped { + bw.WriteBound(nextMsg, bound) + negentropy.WriteVarInt(nextMsg, int(negentropy.FingerprintMode)) + nextMsg.Write(acc.Buf[0:negentropy.FingerprintSize] /* idem */) + } + case negentropy.IdListMode: + // when receiving an idlist we will never send this bound again to this peer + skipCallback(getBoundKey(bound)) + + // and instead of sending these ids to the other peer we'll send a fingerprint + acc.Reset() + + numIds, err := negentropy.ReadVarInt(reader) + if err != nil { + return "", err + } + + for range numIds { + id := nostr.ID{} + + _, err = reader.Read(id[:]) + if err != nil { + return "", err + } + + idCallback(id) + + acc.AddBytes(id[:]) + } + + if _, skipped := skippedBounds[getBoundKey(bound)]; !skipped { + fingerprint := acc.GetFingerprint(numIds) + + bw.WriteBound(nextMsg, bound) + negentropy.WriteVarInt(nextMsg, int(negentropy.FingerprintMode)) + nextMsg.Write(fingerprint[:]) + } + default: + return "", fmt.Errorf("unknown mode %v", mode) + } + } + + return nostr.HexEncodeToString(nextMsg.Bytes()), nil +}