dashboard / git-pr / feat: add multitenancy #25 rss

accepted · opened on 2024-11-06T04:10:46Z by erock
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 25
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 25
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 25 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 25
# accept PR
ssh pr.pico.sh pr accept 25
# close PR
ssh pr.pico.sh pr close 25

Logs

erock created pr with ps-42 on 2024-09-09T04:26:26Z
erock changed status on 2024-09-09T04:32:01Z {"status":"accepted"}
erock changed status on 2024-11-06T15:04:09Z {"status":"accepted"}

Patchsets

ps-42 by erock on 2024-09-09T04:26:26Z

Patchset ps-42

refactor: unsub method to interface

Eric Bower
2024-09-09T04:24:45Z
multicast.go
+20 -0
pubsub.go
+7 -4
Back to top

refactor: unsub method to interface

fix(cmd): proper cleanup with ssh session is closed (e.g. ctr+c)
cmd/authorized_keys/main.go link
+18 -1
 1diff --git a/cmd/authorized_keys/main.go b/cmd/authorized_keys/main.go
 2index c36f6c9..a936e6b 100644
 3--- a/cmd/authorized_keys/main.go
 4+++ b/cmd/authorized_keys/main.go
 5@@ -32,6 +32,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
 6 				return
 7 			}
 8 
 9+			ctx := sesh.Context()
10 			cmd := strings.TrimSpace(args[0])
11 			channel := args[1]
12 			logger := cfg.Logger.With(
13@@ -49,6 +50,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
14 					Writer: sesh,
15 					Chan:   make(chan error),
16 				}
17+				go func() {
18+					<-ctx.Done()
19+					err := cfg.PubSub.UnSub(sub)
20+					if err != nil {
21+						wish.Errorln(sesh, err)
22+					}
23+				}()
24 				err := cfg.PubSub.Sub(sub)
25 				if err != nil {
26 					wish.Errorln(sesh, err)
27@@ -58,6 +66,13 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
28 					Name:   channel,
29 					Reader: sesh,
30 				}
31+				go func() {
32+					<-ctx.Done()
33+					err := cfg.PubSub.UnPub(msg)
34+					if err != nil {
35+						wish.Errorln(sesh, err)
36+					}
37+				}()
38 				err := cfg.PubSub.Pub(msg)
39 				if err != nil {
40 					wish.Errorln(sesh, err)
41@@ -88,7 +103,9 @@ func main() {
42 		wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
43 		wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
44 		wish.WithAuthorizedKeys(keyPath),
45-		wish.WithMiddleware(PubSubMiddleware(cfg)),
46+		wish.WithMiddleware(
47+			PubSubMiddleware(cfg),
48+		),
49 	)
50 	if err != nil {
51 		logger.Error(err.Error())
multicast.go link
+20 -0
 1diff --git a/multicast.go b/multicast.go
 2index c9c7f52..b2a9a84 100644
 3--- a/multicast.go
 4+++ b/multicast.go
 5@@ -1,8 +1,10 @@
 6 package pubsub
 7 
 8 import (
 9+	"fmt"
10 	"io"
11 	"log/slog"
12+	"time"
13 
14 	"github.com/google/uuid"
15 )
16@@ -67,6 +69,7 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
17 	writers := []io.Writer{}
18 	for _, sub := range b.subs {
19 		if b.PubMatcher(msg, sub) {
20+			log.Info("found match", "sub", sub.ID)
21 			matches = append(matches, sub)
22 			writers = append(writers, sub.Writer)
23 		}
24@@ -78,6 +81,11 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
25 			log.Info("no subs found, waiting for sub")
26 			sub = <-b.Chan
27 			if b.PubMatcher(msg, sub) {
28+				// empty subscriber is a signal to force a pub to stop
29+				// waiting for a sub
30+				if sub.Writer == nil {
31+					return fmt.Errorf("pub closed")
32+				}
33 				return b.Pub(msg)
34 			}
35 		}
36@@ -97,6 +105,18 @@ func (b *PubSubMulticast) Pub(msg *Msg) error {
37 			log.Error("unsub err", "err", err)
38 		}
39 	}
40+	del := time.Now()
41+	msg.Delivered = &del
42 
43 	return err
44 }
45+
46+func (b *PubSubMulticast) UnPub(msg *Msg) error {
47+	b.Logger.Info("unpub", "channel", msg.Name)
48+	// if the message hasn't been delivered then send a cancel sub to
49+	// the multicast channel
50+	if msg.Delivered == nil {
51+		b.Chan <- &Subscriber{Name: msg.Name}
52+	}
53+	return nil
54+}
pubsub.go link
+7 -4
 1diff --git a/pubsub.go b/pubsub.go
 2index bf0754a..26922dd 100644
 3--- a/pubsub.go
 4+++ b/pubsub.go
 5@@ -3,6 +3,7 @@ package pubsub
 6 import (
 7 	"io"
 8 	"log/slog"
 9+	"time"
10 )
11 
12 type Subscriber struct {
13@@ -18,15 +19,17 @@ func (s *Subscriber) Wait() error {
14 }
15 
16 type Msg struct {
17-	Name   string
18-	Reader io.Reader
19+	Name      string
20+	Reader    io.Reader
21+	Delivered *time.Time
22 }
23 
24 type PubSub interface {
25 	GetSubs() []*Subscriber
26-	Sub(l *Subscriber) error
27-	UnSub(l *Subscriber) error
28+	Sub(sub *Subscriber) error
29+	UnSub(sub *Subscriber) error
30 	Pub(msg *Msg) error
31+	UnPub(msg *Msg) error
32 	// return true if message should be sent to this subscriber
33 	PubMatcher(msg *Msg, sub *Subscriber) bool
34 }