dashboard / erock/git-pr / feat: add multitenancy #25 rss

accepted · opened on 2024-11-06T04:10:46Z by erock
Help
checkout latest patchset:
ssh pr.pico.sh print pr-25 | git am -3
checkout any patchset in a patch request:
ssh pr.pico.sh print ps-X | git am -3
add changes to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add 25
add review to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add --review 25
accept PR:
ssh pr.pico.sh pr accept 25
close PR:
ssh pr.pico.sh pr close 25

Logs

erock created pr with ps-42 on 2024-09-09T04:26:26Z
erock changed status on 2024-09-09T04:32:01Z {"status":"accepted"}
erock changed status on 2024-11-06T15:04:09Z {"status":"accepted"}

Patchsets

ps-42 by erock on 2024-09-09T04:26:26Z

Patchset ps-42

refactor: unsub method to interface

Eric Bower
2024-09-09T04:24:45Z
multicast.go
+20 -0
pubsub.go
+7 -4
Back to top

refactor: unsub method to interface

fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
cmd/authorized_keys/main.go link
+18 -1
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
diff --git a/cmd/authorized_keys/main.go b/cmd/authorized_keys/main.go
index c36f6c9..a936e6b 100644
--- a/cmd/authorized_keys/main.go
+++ b/cmd/authorized_keys/main.go
@@ -32,6 +32,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 				return
 			}
 
+			ctx := sesh.Context()
 			cmd := strings.TrimSpace(args[0])
 			channel := args[1]
 			logger := cfg.Logger.With(
@@ -49,6 +50,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 					Writer: sesh,
 					Chan:   make(chan error),
 				}
+				go func() {
+					<-ctx.Done()
+					err := cfg.PubSub.UnSub(sub)
+					if err != nil {
+						wish.Errorln(sesh, err)
+					}
+				}()
 				err := cfg.PubSub.Sub(sub)
 				if err != nil {
 					wish.Errorln(sesh, err)
@@ -58,6 +66,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 					Name:   channel,
 					Reader: sesh,
 				}
+				go func() {
+					<-ctx.Done()
+					err := cfg.PubSub.UnPub(msg)
+					if err != nil {
+						wish.Errorln(sesh, err)
+					}
+				}()
 				err := cfg.PubSub.Pub(msg)
 				if err != nil {
 					wish.Errorln(sesh, err)
@@ -88,7 +103,9 @@ func main() {
 		wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
 		wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
 		wish.WithAuthorizedKeys(keyPath),
-		wish.WithMiddleware(PubSubMiddleware(cfg)),
+		wish.WithMiddleware(
+			PubSubMiddleware(cfg),
+		),
 	)
 	if err != nil {
 		logger.Error(err.Error())
multicast.go link
+20 -0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
diff --git a/multicast.go b/multicast.go
index c9c7f52..b2a9a84 100644
--- a/multicast.go
+++ b/multicast.go
@@ -1,8 +1,10 @@
 package pubsub
 
 import (
+	"fmt"
 	"io"
 	"log/slog"
+	"time"
 
 	"github.com/google/uuid"
 )
@@ -67,6 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
 	writers := []io.Writer{}
 	for _, sub := range b.subs {
 		if b.PubMatcher(msg, sub) {
+			log.Info("found match", "sub", sub.ID)
 			matches = append(matches, sub)
 			writers = append(writers, sub.Writer)
 		}
@@ -78,6 +81,11 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
 			log.Info("no subs found, waiting for sub")
 			sub = <-b.Chan
 			if b.PubMatcher(msg, sub) {
+				// empty subscriber is a signal to force a pub to stop
+				// waiting for a sub
+				if sub.Writer == nil {
+					return fmt.Errorf("pub closed")
+				}
 				return b.Pub(msg)
 			}
 		}
@@ -97,6 +105,18 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
 			log.Error("unsub err", "err", err)
 		}
 	}
+	del := time.Now()
+	msg.Delivered = &del
 
 	return err
 }
+
+func (b *PubSubMulticast) UnPub(msg *Msg) error {
+	b.Logger.Info("unpub", "channel", msg.Name)
+	// if the message hasn't been delivered then send a cancel sub to
+	// the multicast channel
+	if msg.Delivered == nil {
+		b.Chan <- &Subscriber{Name: msg.Name}
+	}
+	return nil
+}
pubsub.go link
+7 -4
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
diff --git a/pubsub.go b/pubsub.go
index bf0754a..26922dd 100644
--- a/pubsub.go
+++ b/pubsub.go
@@ -3,6 +3,7 @@ package pubsub
 import (
 	"io"
 	"log/slog"
+	"time"
 )
 
 type Subscriber struct {
@@ -18,15 +19,17 @@ func (s *Subscriber) Wait() error {
 }
 
 type Msg struct {
-	Name   string
-	Reader io.Reader
+	Name      string
+	Reader    io.Reader
+	Delivered *time.Time
 }
 
 type PubSub interface {
 	GetSubs() []*Subscriber
-	Sub(l *Subscriber) error
-	UnSub(l *Subscriber) error
+	Sub(sub *Subscriber) error
+	UnSub(sub *Subscriber) error
 	Pub(msg *Msg) error
+	UnPub(msg *Msg) error
 	// return true if message should be sent to this subscriber
 	PubMatcher(msg *Msg, sub *Subscriber) bool
 }