pubsub / refactor: unsub method to interface #25

accepted · opened on 2024-09-09T04:26:26Z by erockmKua
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 25
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 25
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 25 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 25
# accept PR
ssh pr.pico.sh pr accept 25
# close PR
ssh pr.pico.sh pr close 25

Logs

erockmKua created pr with ps-42 on 2024-09-09T04:26:26Z
erockmKua changed status on 2024-09-09T04:32:01Z {"status":"accepted"}

Patchsets

Diff ↕

refactor: unsub method to interface

Eric Bower <me@erock.io>
fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
 cmd/authorized_keys/main.go | 19 ++++++++++++++++++-
 multicast.go                | 20 ++++++++++++++++++++
 pubsub.go                   | 11 +++++++----
 3 files changed, 45 insertions(+), 5 deletions(-)
  1From 92777a8b167b31fb77310626499d425e6b869f26 Mon Sep 17 00:00:00 2001
  2From: Eric Bower <me@erock.io>
  3Date: Mon, 9 Sep 2024 00:24:45 -0400
  4Subject: [PATCH] refactor: unsub method to interface
  5
  6fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
  7---
  8 cmd/authorized_keys/main.go | 19 ++++++++++++++++++-
  9 multicast.go                | 20 ++++++++++++++++++++
 10 pubsub.go                   | 11 +++++++----
 11 3 files changed, 45 insertions(+), 5 deletions(-)
 12
 13diff --git a/cmd/authorized_keys/main.go b/cmd/authorized_keys/main.go
 14index c36f6c9..a936e6b 100644
 15--- a/cmd/authorized_keys/main.go
 16+++ b/cmd/authorized_keys/main.go
 17@@ -32,6 +32,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 18 				return
 19 			}
 20 
 21+			ctx := sesh.Context()
 22 			cmd := strings.TrimSpace(args[0])
 23 			channel := args[1]
 24 			logger := cfg.Logger.With(
 25@@ -49,6 +50,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 26 					Writer: sesh,
 27 					Chan:   make(chan error),
 28 				}
 29+				go func() {
 30+					<-ctx.Done()
 31+					err := cfg.PubSub.UnSub(sub)
 32+					if err != nil {
 33+						wish.Errorln(sesh, err)
 34+					}
 35+				}()
 36 				err := cfg.PubSub.Sub(sub)
 37 				if err != nil {
 38 					wish.Errorln(sesh, err)
 39@@ -58,6 +66,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 40 					Name:   channel,
 41 					Reader: sesh,
 42 				}
 43+				go func() {
 44+					<-ctx.Done()
 45+					err := cfg.PubSub.UnPub(msg)
 46+					if err != nil {
 47+						wish.Errorln(sesh, err)
 48+					}
 49+				}()
 50 				err := cfg.PubSub.Pub(msg)
 51 				if err != nil {
 52 					wish.Errorln(sesh, err)
 53@@ -88,7 +103,9 @@ func main() {
 54 		wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
 55 		wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
 56 		wish.WithAuthorizedKeys(keyPath),
 57-		wish.WithMiddleware(PubSubMiddleware(cfg)),
 58+		wish.WithMiddleware(
 59+			PubSubMiddleware(cfg),
 60+		),
 61 	)
 62 	if err != nil {
 63 		logger.Error(err.Error())
 64diff --git a/multicast.go b/multicast.go
 65index c9c7f52..b2a9a84 100644
 66--- a/multicast.go
 67+++ b/multicast.go
 68@@ -1,8 +1,10 @@
 69 package pubsub
 70 
 71 import (
 72+	"fmt"
 73 	"io"
 74 	"log/slog"
 75+	"time"
 76 
 77 	"github.com/google/uuid"
 78 )
 79@@ -67,6 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
 80 	writers := []io.Writer{}
 81 	for _, sub := range b.subs {
 82 		if b.PubMatcher(msg, sub) {
 83+			log.Info("found match", "sub", sub.ID)
 84 			matches = append(matches, sub)
 85 			writers = append(writers, sub.Writer)
 86 		}
 87@@ -78,6 +81,11 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
 88 			log.Info("no subs found, waiting for sub")
 89 			sub = <-b.Chan
 90 			if b.PubMatcher(msg, sub) {
 91+				// empty subscriber is a signal to force a pub to stop
 92+				// waiting for a sub
 93+				if sub.Writer == nil {
 94+					return fmt.Errorf("pub closed")
 95+				}
 96 				return b.Pub(msg)
 97 			}
 98 		}
 99@@ -97,6 +105,18 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
100 			log.Error("unsub err", "err", err)
101 		}
102 	}
103+	del := time.Now()
104+	msg.Delivered = &del
105 
106 	return err
107 }
108+
109+func (b *PubSubMulticast) UnPub(msg *Msg) error {
110+	b.Logger.Info("unpub", "channel", msg.Name)
111+	// if the message hasn't been delivered then send a cancel sub to
112+	// the multicast channel
113+	if msg.Delivered == nil {
114+		b.Chan <- &Subscriber{Name: msg.Name}
115+	}
116+	return nil
117+}
118diff --git a/pubsub.go b/pubsub.go
119index bf0754a..26922dd 100644
120--- a/pubsub.go
121+++ b/pubsub.go
122@@ -3,6 +3,7 @@ package pubsub
123 import (
124 	"io"
125 	"log/slog"
126+	"time"
127 )
128 
129 type Subscriber struct {
130@@ -18,15 +19,17 @@ func (s *Subscriber) Wait() error {
131 }
132 
133 type Msg struct {
134-	Name   string
135-	Reader io.Reader
136+	Name      string
137+	Reader    io.Reader
138+	Delivered *time.Time
139 }
140 
141 type PubSub interface {
142 	GetSubs() []*Subscriber
143-	Sub(l *Subscriber) error
144-	UnSub(l *Subscriber) error
145+	Sub(sub *Subscriber) error
146+	UnSub(sub *Subscriber) error
147 	Pub(msg *Msg) error
148+	UnPub(msg *Msg) error
149 	// return true if message should be sent to this subscriber
150 	PubMatcher(msg *Msg, sub *Subscriber) bool
151 }
152
153base-commit: d8e78982a3da8879534f062d910dbf3519b55081
154-- 
1552.45.2
156
ps-42 by erockmKua on 2024-09-09T04:26:26Z

refactor: unsub method to interface

Eric Bower <me@erock.io> 2024-09-09T04:24:45Z
fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
 cmd/authorized_keys/main.go | 19 ++++++++++++++++++-
 multicast.go                | 20 ++++++++++++++++++++
 pubsub.go                   | 11 +++++++----
 3 files changed, 45 insertions(+), 5 deletions(-)
  1From 92777a8b167b31fb77310626499d425e6b869f26 Mon Sep 17 00:00:00 2001
  2From: Eric Bower <me@erock.io>
  3Date: Mon, 9 Sep 2024 00:24:45 -0400
  4Subject: [PATCH] refactor: unsub method to interface
  5
  6fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
  7---
  8 cmd/authorized_keys/main.go | 19 ++++++++++++++++++-
  9 multicast.go                | 20 ++++++++++++++++++++
 10 pubsub.go                   | 11 +++++++----
 11 3 files changed, 45 insertions(+), 5 deletions(-)
 12
 13diff --git a/cmd/authorized_keys/main.go b/cmd/authorized_keys/main.go
 14index c36f6c9..a936e6b 100644
 15--- a/cmd/authorized_keys/main.go
 16+++ b/cmd/authorized_keys/main.go
 17@@ -32,6 +32,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 18 				return
 19 			}
 20 
 21+			ctx := sesh.Context()
 22 			cmd := strings.TrimSpace(args[0])
 23 			channel := args[1]
 24 			logger := cfg.Logger.With(
 25@@ -49,6 +50,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 26 					Writer: sesh,
 27 					Chan:   make(chan error),
 28 				}
 29+				go func() {
 30+					<-ctx.Done()
 31+					err := cfg.PubSub.UnSub(sub)
 32+					if err != nil {
 33+						wish.Errorln(sesh, err)
 34+					}
 35+				}()
 36 				err := cfg.PubSub.Sub(sub)
 37 				if err != nil {
 38 					wish.Errorln(sesh, err)
 39@@ -58,6 +66,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 40 					Name:   channel,
 41 					Reader: sesh,
 42 				}
 43+				go func() {
 44+					<-ctx.Done()
 45+					err := cfg.PubSub.UnPub(msg)
 46+					if err != nil {
 47+						wish.Errorln(sesh, err)
 48+					}
 49+				}()
 50 				err := cfg.PubSub.Pub(msg)
 51 				if err != nil {
 52 					wish.Errorln(sesh, err)
 53@@ -88,7 +103,9 @@ func main() {
 54 		wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
 55 		wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
 56 		wish.WithAuthorizedKeys(keyPath),
 57-		wish.WithMiddleware(PubSubMiddleware(cfg)),
 58+		wish.WithMiddleware(
 59+			PubSubMiddleware(cfg),
 60+		),
 61 	)
 62 	if err != nil {
 63 		logger.Error(err.Error())
 64diff --git a/multicast.go b/multicast.go
 65index c9c7f52..b2a9a84 100644
 66--- a/multicast.go
 67+++ b/multicast.go
 68@@ -1,8 +1,10 @@
 69 package pubsub
 70 
 71 import (
 72+	"fmt"
 73 	"io"
 74 	"log/slog"
 75+	"time"
 76 
 77 	"github.com/google/uuid"
 78 )
 79@@ -67,6 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
 80 	writers := []io.Writer{}
 81 	for _, sub := range b.subs {
 82 		if b.PubMatcher(msg, sub) {
 83+			log.Info("found match", "sub", sub.ID)
 84 			matches = append(matches, sub)
 85 			writers = append(writers, sub.Writer)
 86 		}
 87@@ -78,6 +81,11 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
 88 			log.Info("no subs found, waiting for sub")
 89 			sub = <-b.Chan
 90 			if b.PubMatcher(msg, sub) {
 91+				// empty subscriber is a signal to force a pub to stop
 92+				// waiting for a sub
 93+				if sub.Writer == nil {
 94+					return fmt.Errorf("pub closed")
 95+				}
 96 				return b.Pub(msg)
 97 			}
 98 		}
 99@@ -97,6 +105,18 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
100 			log.Error("unsub err", "err", err)
101 		}
102 	}
103+	del := time.Now()
104+	msg.Delivered = &del
105 
106 	return err
107 }
108+
109+func (b *PubSubMulticast) UnPub(msg *Msg) error {
110+	b.Logger.Info("unpub", "channel", msg.Name)
111+	// if the message hasn't been delivered then send a cancel sub to
112+	// the multicast channel
113+	if msg.Delivered == nil {
114+		b.Chan <- &Subscriber{Name: msg.Name}
115+	}
116+	return nil
117+}
118diff --git a/pubsub.go b/pubsub.go
119index bf0754a..26922dd 100644
120--- a/pubsub.go
121+++ b/pubsub.go
122@@ -3,6 +3,7 @@ package pubsub
123 import (
124 	"io"
125 	"log/slog"
126+	"time"
127 )
128 
129 type Subscriber struct {
130@@ -18,15 +19,17 @@ func (s *Subscriber) Wait() error {
131 }
132 
133 type Msg struct {
134-	Name   string
135-	Reader io.Reader
136+	Name      string
137+	Reader    io.Reader
138+	Delivered *time.Time
139 }
140 
141 type PubSub interface {
142 	GetSubs() []*Subscriber
143-	Sub(l *Subscriber) error
144-	UnSub(l *Subscriber) error
145+	Sub(sub *Subscriber) error
146+	UnSub(sub *Subscriber) error
147 	Pub(msg *Msg) error
148+	UnPub(msg *Msg) error
149 	// return true if message should be sent to this subscriber
150 	PubMatcher(msg *Msg, sub *Subscriber) bool
151 }
152
153base-commit: d8e78982a3da8879534f062d910dbf3519b55081
154-- 
1552.45.2
156