Logs
erockmKua
created pr with ps-42
on erockmKua
changed status
on {"status":"accepted"}
Patchsets
Diff ↕
refactor: unsub method to interface
Eric Bower <me@erock.io>
fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
cmd/authorized_keys/main.go | 19 ++++++++++++++++++- multicast.go | 20 ++++++++++++++++++++ pubsub.go | 11 +++++++---- 3 files changed, 45 insertions(+), 5 deletions(-)
1From 92777a8b167b31fb77310626499d425e6b869f26 Mon Sep 17 00:00:00 2001
2From: Eric Bower <me@erock.io>
3Date: Mon, 9 Sep 2024 00:24:45 -0400
4Subject: [PATCH] refactor: unsub method to interface
5
6fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
7---
8 cmd/authorized_keys/main.go | 19 ++++++++++++++++++-
9 multicast.go | 20 ++++++++++++++++++++
10 pubsub.go | 11 +++++++----
11 3 files changed, 45 insertions(+), 5 deletions(-)
12
13diff --git a/cmd/authorized_keys/main.go b/cmd/authorized_keys/main.go
14index c36f6c9..a936e6b 100644
15--- a/cmd/authorized_keys/main.go
16+++ b/cmd/authorized_keys/main.go
17@@ -32,6 +32,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
18 return
19 }
20
21+ ctx := sesh.Context()
22 cmd := strings.TrimSpace(args[0])
23 channel := args[1]
24 logger := cfg.Logger.With(
25@@ -49,6 +50,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
26 Writer: sesh,
27 Chan: make(chan error),
28 }
29+ go func() {
30+ <-ctx.Done()
31+ err := cfg.PubSub.UnSub(sub)
32+ if err != nil {
33+ wish.Errorln(sesh, err)
34+ }
35+ }()
36 err := cfg.PubSub.Sub(sub)
37 if err != nil {
38 wish.Errorln(sesh, err)
39@@ -58,6 +66,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
40 Name: channel,
41 Reader: sesh,
42 }
43+ go func() {
44+ <-ctx.Done()
45+ err := cfg.PubSub.UnPub(msg)
46+ if err != nil {
47+ wish.Errorln(sesh, err)
48+ }
49+ }()
50 err := cfg.PubSub.Pub(msg)
51 if err != nil {
52 wish.Errorln(sesh, err)
53@@ -88,7 +103,9 @@ func main() {
54 wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
55 wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
56 wish.WithAuthorizedKeys(keyPath),
57- wish.WithMiddleware(PubSubMiddleware(cfg)),
58+ wish.WithMiddleware(
59+ PubSubMiddleware(cfg),
60+ ),
61 )
62 if err != nil {
63 logger.Error(err.Error())
64diff --git a/multicast.go b/multicast.go
65index c9c7f52..b2a9a84 100644
66--- a/multicast.go
67+++ b/multicast.go
68@@ -1,8 +1,10 @@
69 package pubsub
70
71 import (
72+ "fmt"
73 "io"
74 "log/slog"
75+ "time"
76
77 "github.com/google/uuid"
78 )
79@@ -67,6 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
80 writers := []io.Writer{}
81 for _, sub := range b.subs {
82 if b.PubMatcher(msg, sub) {
83+ log.Info("found match", "sub", sub.ID)
84 matches = append(matches, sub)
85 writers = append(writers, sub.Writer)
86 }
87@@ -78,6 +81,11 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
88 log.Info("no subs found, waiting for sub")
89 sub = <-b.Chan
90 if b.PubMatcher(msg, sub) {
91+ // empty subscriber is a signal to force a pub to stop
92+ // waiting for a sub
93+ if sub.Writer == nil {
94+ return fmt.Errorf("pub closed")
95+ }
96 return b.Pub(msg)
97 }
98 }
99@@ -97,6 +105,18 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
100 log.Error("unsub err", "err", err)
101 }
102 }
103+ del := time.Now()
104+ msg.Delivered = &del
105
106 return err
107 }
108+
109+func (b *PubSubMulticast) UnPub(msg *Msg) error {
110+ b.Logger.Info("unpub", "channel", msg.Name)
111+ // if the message hasn't been delivered then send a cancel sub to
112+ // the multicast channel
113+ if msg.Delivered == nil {
114+ b.Chan <- &Subscriber{Name: msg.Name}
115+ }
116+ return nil
117+}
118diff --git a/pubsub.go b/pubsub.go
119index bf0754a..26922dd 100644
120--- a/pubsub.go
121+++ b/pubsub.go
122@@ -3,6 +3,7 @@ package pubsub
123 import (
124 "io"
125 "log/slog"
126+ "time"
127 )
128
129 type Subscriber struct {
130@@ -18,15 +19,17 @@ func (s *Subscriber) Wait() error {
131 }
132
133 type Msg struct {
134- Name string
135- Reader io.Reader
136+ Name string
137+ Reader io.Reader
138+ Delivered *time.Time
139 }
140
141 type PubSub interface {
142 GetSubs() []*Subscriber
143- Sub(l *Subscriber) error
144- UnSub(l *Subscriber) error
145+ Sub(sub *Subscriber) error
146+ UnSub(sub *Subscriber) error
147 Pub(msg *Msg) error
148+ UnPub(msg *Msg) error
149 // return true if message should be sent to this subscriber
150 PubMatcher(msg *Msg, sub *Subscriber) bool
151 }
152
153base-commit: d8e78982a3da8879534f062d910dbf3519b55081
154--
1552.45.2
156
ps-42
by
erockmKua
on refactor: unsub method to interface
Eric Bower <me@erock.io>
fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
cmd/authorized_keys/main.go | 19 ++++++++++++++++++- multicast.go | 20 ++++++++++++++++++++ pubsub.go | 11 +++++++---- 3 files changed, 45 insertions(+), 5 deletions(-)
1From 92777a8b167b31fb77310626499d425e6b869f26 Mon Sep 17 00:00:00 2001
2From: Eric Bower <me@erock.io>
3Date: Mon, 9 Sep 2024 00:24:45 -0400
4Subject: [PATCH] refactor: unsub method to interface
5
6fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
7---
8 cmd/authorized_keys/main.go | 19 ++++++++++++++++++-
9 multicast.go | 20 ++++++++++++++++++++
10 pubsub.go | 11 +++++++----
11 3 files changed, 45 insertions(+), 5 deletions(-)
12
13diff --git a/cmd/authorized_keys/main.go b/cmd/authorized_keys/main.go
14index c36f6c9..a936e6b 100644
15--- a/cmd/authorized_keys/main.go
16+++ b/cmd/authorized_keys/main.go
17@@ -32,6 +32,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
18 return
19 }
20
21+ ctx := sesh.Context()
22 cmd := strings.TrimSpace(args[0])
23 channel := args[1]
24 logger := cfg.Logger.With(
25@@ -49,6 +50,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
26 Writer: sesh,
27 Chan: make(chan error),
28 }
29+ go func() {
30+ <-ctx.Done()
31+ err := cfg.PubSub.UnSub(sub)
32+ if err != nil {
33+ wish.Errorln(sesh, err)
34+ }
35+ }()
36 err := cfg.PubSub.Sub(sub)
37 if err != nil {
38 wish.Errorln(sesh, err)
39@@ -58,6 +66,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
40 Name: channel,
41 Reader: sesh,
42 }
43+ go func() {
44+ <-ctx.Done()
45+ err := cfg.PubSub.UnPub(msg)
46+ if err != nil {
47+ wish.Errorln(sesh, err)
48+ }
49+ }()
50 err := cfg.PubSub.Pub(msg)
51 if err != nil {
52 wish.Errorln(sesh, err)
53@@ -88,7 +103,9 @@ func main() {
54 wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
55 wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
56 wish.WithAuthorizedKeys(keyPath),
57- wish.WithMiddleware(PubSubMiddleware(cfg)),
58+ wish.WithMiddleware(
59+ PubSubMiddleware(cfg),
60+ ),
61 )
62 if err != nil {
63 logger.Error(err.Error())
64diff --git a/multicast.go b/multicast.go
65index c9c7f52..b2a9a84 100644
66--- a/multicast.go
67+++ b/multicast.go
68@@ -1,8 +1,10 @@
69 package pubsub
70
71 import (
72+ "fmt"
73 "io"
74 "log/slog"
75+ "time"
76
77 "github.com/google/uuid"
78 )
79@@ -67,6 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
80 writers := []io.Writer{}
81 for _, sub := range b.subs {
82 if b.PubMatcher(msg, sub) {
83+ log.Info("found match", "sub", sub.ID)
84 matches = append(matches, sub)
85 writers = append(writers, sub.Writer)
86 }
87@@ -78,6 +81,11 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
88 log.Info("no subs found, waiting for sub")
89 sub = <-b.Chan
90 if b.PubMatcher(msg, sub) {
91+ // empty subscriber is a signal to force a pub to stop
92+ // waiting for a sub
93+ if sub.Writer == nil {
94+ return fmt.Errorf("pub closed")
95+ }
96 return b.Pub(msg)
97 }
98 }
99@@ -97,6 +105,18 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
100 log.Error("unsub err", "err", err)
101 }
102 }
103+ del := time.Now()
104+ msg.Delivered = &del
105
106 return err
107 }
108+
109+func (b *PubSubMulticast) UnPub(msg *Msg) error {
110+ b.Logger.Info("unpub", "channel", msg.Name)
111+ // if the message hasn't been delivered then send a cancel sub to
112+ // the multicast channel
113+ if msg.Delivered == nil {
114+ b.Chan <- &Subscriber{Name: msg.Name}
115+ }
116+ return nil
117+}
118diff --git a/pubsub.go b/pubsub.go
119index bf0754a..26922dd 100644
120--- a/pubsub.go
121+++ b/pubsub.go
122@@ -3,6 +3,7 @@ package pubsub
123 import (
124 "io"
125 "log/slog"
126+ "time"
127 )
128
129 type Subscriber struct {
130@@ -18,15 +19,17 @@ func (s *Subscriber) Wait() error {
131 }
132
133 type Msg struct {
134- Name string
135- Reader io.Reader
136+ Name string
137+ Reader io.Reader
138+ Delivered *time.Time
139 }
140
141 type PubSub interface {
142 GetSubs() []*Subscriber
143- Sub(l *Subscriber) error
144- UnSub(l *Subscriber) error
145+ Sub(sub *Subscriber) error
146+ UnSub(sub *Subscriber) error
147 Pub(msg *Msg) error
148+ UnPub(msg *Msg) error
149 // return true if message should be sent to this subscriber
150 PubMatcher(msg *Msg, sub *Subscriber) bool
151 }
152
153base-commit: d8e78982a3da8879534f062d910dbf3519b55081
154--
1552.45.2
156