Logs
Patchsets
Patchset ps-42
refactor: unsub method to interface
Eric Bower
cmd/authorized_keys/main.go
+18
-1
multicast.go
+20
-0
pubsub.go
+7
-4
refactor: unsub method to interface
fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
cmd/authorized_keys/main.go
link
+18
-1
+18
-1
1diff --git a/cmd/authorized_keys/main.go b/cmd/authorized_keys/main.go
2index c36f6c9..a936e6b 100644
3--- a/cmd/authorized_keys/main.go
4+++ b/cmd/authorized_keys/main.go
5@@ -32,6 +32,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
6 return
7 }
8
9+ ctx := sesh.Context()
10 cmd := strings.TrimSpace(args[0])
11 channel := args[1]
12 logger := cfg.Logger.With(
13@@ -49,6 +50,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
14 Writer: sesh,
15 Chan: make(chan error),
16 }
17+ go func() {
18+ <-ctx.Done()
19+ err := cfg.PubSub.UnSub(sub)
20+ if err != nil {
21+ wish.Errorln(sesh, err)
22+ }
23+ }()
24 err := cfg.PubSub.Sub(sub)
25 if err != nil {
26 wish.Errorln(sesh, err)
27@@ -58,6 +66,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
28 Name: channel,
29 Reader: sesh,
30 }
31+ go func() {
32+ <-ctx.Done()
33+ err := cfg.PubSub.UnPub(msg)
34+ if err != nil {
35+ wish.Errorln(sesh, err)
36+ }
37+ }()
38 err := cfg.PubSub.Pub(msg)
39 if err != nil {
40 wish.Errorln(sesh, err)
41@@ -88,7 +103,9 @@ func main() {
42 wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
43 wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
44 wish.WithAuthorizedKeys(keyPath),
45- wish.WithMiddleware(PubSubMiddleware(cfg)),
46+ wish.WithMiddleware(
47+ PubSubMiddleware(cfg),
48+ ),
49 )
50 if err != nil {
51 logger.Error(err.Error())
multicast.go
link
+20
-0
+20
-0
1diff --git a/multicast.go b/multicast.go
2index c9c7f52..b2a9a84 100644
3--- a/multicast.go
4+++ b/multicast.go
5@@ -1,8 +1,10 @@
6 package pubsub
7
8 import (
9+ "fmt"
10 "io"
11 "log/slog"
12+ "time"
13
14 "github.com/google/uuid"
15 )
16@@ -67,6 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
17 writers := []io.Writer{}
18 for _, sub := range b.subs {
19 if b.PubMatcher(msg, sub) {
20+ log.Info("found match", "sub", sub.ID)
21 matches = append(matches, sub)
22 writers = append(writers, sub.Writer)
23 }
24@@ -78,6 +81,11 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
25 log.Info("no subs found, waiting for sub")
26 sub = <-b.Chan
27 if b.PubMatcher(msg, sub) {
28+ // empty subscriber is a signal to force a pub to stop
29+ // waiting for a sub
30+ if sub.Writer == nil {
31+ return fmt.Errorf("pub closed")
32+ }
33 return b.Pub(msg)
34 }
35 }
36@@ -97,6 +105,18 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
37 log.Error("unsub err", "err", err)
38 }
39 }
40+ del := time.Now()
41+ msg.Delivered = &del
42
43 return err
44 }
45+
46+func (b *PubSubMulticast) UnPub(msg *Msg) error {
47+ b.Logger.Info("unpub", "channel", msg.Name)
48+ // if the message hasn't been delivered then send a cancel sub to
49+ // the multicast channel
50+ if msg.Delivered == nil {
51+ b.Chan <- &Subscriber{Name: msg.Name}
52+ }
53+ return nil
54+}
pubsub.go
link
+7
-4
+7
-4
1diff --git a/pubsub.go b/pubsub.go
2index bf0754a..26922dd 100644
3--- a/pubsub.go
4+++ b/pubsub.go
5@@ -3,6 +3,7 @@ package pubsub
6 import (
7 "io"
8 "log/slog"
9+ "time"
10 )
11
12 type Subscriber struct {
13@@ -18,15 +19,17 @@ func (s *Subscriber) Wait() error {
14 }
15
16 type Msg struct {
17- Name string
18- Reader io.Reader
19+ Name string
20+ Reader io.Reader
21+ Delivered *time.Time
22 }
23
24 type PubSub interface {
25 GetSubs() []*Subscriber
26- Sub(l *Subscriber) error
27- UnSub(l *Subscriber) error
28+ Sub(sub *Subscriber) error
29+ UnSub(sub *Subscriber) error
30 Pub(msg *Msg) error
31+ UnPub(msg *Msg) error
32 // return true if message should be sent to this subscriber
33 PubMatcher(msg *Msg, sub *Subscriber) bool
34 }