Logs
erockmKua
created pr with ps-47
on Patchsets
Diff ↕
feat(pubsub): if no channel is provided, randomly gen one
Eric Bower <me@erock.io>
pubsub/cli.go | 161 ++++++++++++++++++++++++++++---------------------- pubsub/ssh.go | 3 + 2 files changed, 95 insertions(+), 69 deletions(-)
1From 9e5b2563edc2c3e5b071d325c5ba280127f04251 Mon Sep 17 00:00:00 2001
2From: Eric Bower <me@erock.io>
3Date: Mon, 23 Sep 2024 23:34:48 -0400
4Subject: [PATCH] feat(pubsub): if no channel is provided, randomly gen one
5
6---
7 pubsub/cli.go | 161 ++++++++++++++++++++++++++++----------------------
8 pubsub/ssh.go | 3 +
9 2 files changed, 95 insertions(+), 69 deletions(-)
10
11diff --git a/pubsub/cli.go b/pubsub/cli.go
12index 9664803..c04d1bd 100644
13--- a/pubsub/cli.go
14+++ b/pubsub/cli.go
15@@ -15,7 +15,6 @@ import (
16 "github.com/google/uuid"
17 "github.com/picosh/pico/db"
18 "github.com/picosh/pico/shared"
19- "github.com/picosh/pico/shared/storage"
20 psub "github.com/picosh/pubsub"
21 "github.com/picosh/send/send/utils"
22 )
23@@ -80,11 +79,18 @@ for bidirectional messages to be sent between any clients connected
24 to a pipe.`
25
26 type CliHandler struct {
27- DBPool db.DB
28- Logger *slog.Logger
29- Storage storage.StorageServe
30- RegistryUrl string
31- PubSub *psub.Cfg
32+ DBPool db.DB
33+ Logger *slog.Logger
34+ PubSub *psub.Cfg
35+ Cfg *shared.ConfigSite
36+}
37+
38+func toSshCmd(cfg *shared.ConfigSite) string {
39+ port := "22"
40+ if cfg.Port != "" {
41+ port = fmt.Sprintf("-p %s", cfg.Port)
42+ }
43+ return fmt.Sprintf("%s %s", port, cfg.Domain)
44 }
45
46 func WishMiddleware(handler *CliHandler) wish.Middleware {
47@@ -112,69 +118,69 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
48 }
49
50 cmd := strings.TrimSpace(args[0])
51- if len(args) == 1 {
52- if cmd == "help" {
53- wish.Println(sesh, helpStr)
54- } else if cmd == "ls" {
55- channelFilter := fmt.Sprintf("%s/", user.Name)
56- if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
57- channelFilter = ""
58- }
59+ if cmd == "help" {
60+ wish.Println(sesh, helpStr)
61+ } else if cmd == "ls" {
62+ channelFilter := fmt.Sprintf("%s/", user.Name)
63+ if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
64+ channelFilter = ""
65+ }
66
67- channels := pubsub.PubSub.GetChannels(channelFilter)
68- pipes := pubsub.PubSub.GetPipes(channelFilter)
69-
70- if len(channels) == 0 && len(pipes) == 0 {
71- wish.Println(sesh, "no pubsub channels or pipes found")
72- } else {
73- var outputData string
74- if len(channels) > 0 {
75- outputData += "Channel Information\r\n"
76- for _, channel := range channels {
77- outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
78- outputData += "\tPubs:\r\n"
79-
80- channel.Pubs.Range(func(I string, J *psub.Pub) bool {
81- outputData += fmt.Sprintf("\t- %s:\r\n", I)
82- return true
83- })
84-
85- outputData += "\tSubs:\r\n"
86-
87- channel.Subs.Range(func(I string, J *psub.Sub) bool {
88- outputData += fmt.Sprintf("\t- %s:\r\n", I)
89- return true
90- })
91- }
92- }
93+ channels := pubsub.PubSub.GetChannels(channelFilter)
94+ pipes := pubsub.PubSub.GetPipes(channelFilter)
95
96- if len(pipes) > 0 {
97- outputData += "Pipe Information\r\n"
98- for _, pipe := range pipes {
99- outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
100- outputData += "\tClients:\r\n"
101-
102- pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
103- outputData += fmt.Sprintf("\t- %s:\r\n", I)
104- return true
105- })
106- }
107+ if len(channels) == 0 && len(pipes) == 0 {
108+ wish.Println(sesh, "no pubsub channels or pipes found")
109+ } else {
110+ var outputData string
111+ if len(channels) > 0 {
112+ outputData += "Channel Information\r\n"
113+ for _, channel := range channels {
114+ outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
115+ outputData += "\tPubs:\r\n"
116+
117+ channel.Pubs.Range(func(I string, J *psub.Pub) bool {
118+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
119+ return true
120+ })
121+
122+ outputData += "\tSubs:\r\n"
123+
124+ channel.Subs.Range(func(I string, J *psub.Sub) bool {
125+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
126+ return true
127+ })
128 }
129+ }
130
131- _, _ = sesh.Write([]byte(outputData))
132+ if len(pipes) > 0 {
133+ outputData += "Pipe Information\r\n"
134+ for _, pipe := range pipes {
135+ outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
136+ outputData += "\tClients:\r\n"
137+
138+ pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
139+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
140+ return true
141+ })
142+ }
143 }
144+
145+ _, _ = sesh.Write([]byte(outputData))
146 }
147- next(sesh)
148- return
149 }
150
151- repoName := strings.TrimSpace(args[1])
152- cmdArgs := args[2:]
153+ channelName := ""
154+ cmdArgs := args[1:]
155+ if len(args) > 1 {
156+ channelName = strings.TrimSpace(args[1])
157+ cmdArgs = args[2:]
158+ }
159 logger.Info(
160 "imgs middleware detected command",
161 "args", args,
162 "cmd", cmd,
163- "repoName", repoName,
164+ "channelName", channelName,
165 "cmdArgs", cmdArgs,
166 )
167
168@@ -183,10 +189,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
169 empty := pubCmd.Bool("e", false, "Send an empty message to subs")
170 public := pubCmd.Bool("p", false, "Anyone can sub to this channel")
171 timeout := pubCmd.Duration("t", -1, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is no timeout.")
172- if !flagCheck(pubCmd, repoName, cmdArgs) {
173+ if !flagCheck(pubCmd, channelName, cmdArgs) {
174 return
175 }
176- channelName := repoName
177
178 var reader io.Reader
179 if *empty {
180@@ -195,10 +200,19 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
181 reader = sesh
182 }
183
184+ if channelName == "" {
185+ channelName = uuid.NewString()
186+ }
187 name := toChannel(user.Name, channelName)
188 if *public {
189 name = toPublicChannel(channelName)
190 }
191+ wish.Printf(
192+ sesh,
193+ "subscribe to this channel:\n\tssh %s sub %s\n",
194+ toSshCmd(handler.Cfg),
195+ channelName,
196+ )
197
198 wish.Println(sesh, "sending msg ...")
199 pub := &psub.Pub{
200@@ -218,12 +232,11 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
201 }
202
203 tt := *timeout
204- str := "no subs found ... waiting"
205- if tt > 0 {
206- str += " " + tt.String()
207- }
208-
209 if count == 0 {
210+ str := "no subs found ... waiting"
211+ if tt > 0 {
212+ str += " " + tt.String()
213+ }
214 wish.Println(sesh, str)
215 }
216
217@@ -251,10 +264,10 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
218 } else if cmd == "sub" {
219 pubCmd := flagSet("pub", sesh)
220 public := pubCmd.Bool("p", false, "Subscribe to a public channel")
221- if !flagCheck(pubCmd, repoName, cmdArgs) {
222+ if !flagCheck(pubCmd, channelName, cmdArgs) {
223 return
224 }
225- channelName := repoName
226+ channelName := channelName
227
228 name := toChannel(user.Name, channelName)
229 if *public {
230@@ -280,15 +293,25 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
231 pipeCmd := flagSet("pipe", sesh)
232 public := pipeCmd.Bool("p", false, "Pipe to a public channel")
233 replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
234- if !flagCheck(pipeCmd, repoName, cmdArgs) {
235+ if !flagCheck(pipeCmd, channelName, cmdArgs) {
236 return
237 }
238- channelName := repoName
239-
240+ isCreator := channelName == ""
241+ if isCreator {
242+ channelName = uuid.NewString()
243+ }
244 name := toChannel(user.Name, channelName)
245 if *public {
246 name = toPublicChannel(channelName)
247 }
248+ if isCreator {
249+ wish.Printf(
250+ sesh,
251+ "subscribe to this channel:\n\tssh %s sub %s\n",
252+ toSshCmd(handler.Cfg),
253+ channelName,
254+ )
255+ }
256
257 pipe := &psub.PipeClient{
258 ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
259diff --git a/pubsub/ssh.go b/pubsub/ssh.go
260index e2abbda..ece9bbf 100644
261--- a/pubsub/ssh.go
262+++ b/pubsub/ssh.go
263@@ -27,6 +27,8 @@ func StartSshServer() {
264 dbh := postgres.NewDB(cfg.DbURL, cfg.Logger)
265 defer dbh.Close()
266
267+ cfg.Port = port
268+
269 pubsub := &psub.Cfg{
270 Logger: logger,
271 PubSub: &psub.PubSubMulticast{
272@@ -40,6 +42,7 @@ func StartSshServer() {
273 Logger: logger,
274 DBPool: dbh,
275 PubSub: pubsub,
276+ Cfg: cfg,
277 }
278
279 sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)
280
281base-commit: bfcbe7d0f5299a459b0f67185424254a1357029b
282--
2832.45.2
284
ps-47
by
erockmKua
on feat(pubsub): if no channel is provided, randomly gen one
Eric Bower <me@erock.io>
pubsub/cli.go | 161 ++++++++++++++++++++++++++++---------------------- pubsub/ssh.go | 3 + 2 files changed, 95 insertions(+), 69 deletions(-)
1From 9e5b2563edc2c3e5b071d325c5ba280127f04251 Mon Sep 17 00:00:00 2001
2From: Eric Bower <me@erock.io>
3Date: Mon, 23 Sep 2024 23:34:48 -0400
4Subject: [PATCH] feat(pubsub): if no channel is provided, randomly gen one
5
6---
7 pubsub/cli.go | 161 ++++++++++++++++++++++++++++----------------------
8 pubsub/ssh.go | 3 +
9 2 files changed, 95 insertions(+), 69 deletions(-)
10
11diff --git a/pubsub/cli.go b/pubsub/cli.go
12index 9664803..c04d1bd 100644
13--- a/pubsub/cli.go
14+++ b/pubsub/cli.go
15@@ -15,7 +15,6 @@ import (
16 "github.com/google/uuid"
17 "github.com/picosh/pico/db"
18 "github.com/picosh/pico/shared"
19- "github.com/picosh/pico/shared/storage"
20 psub "github.com/picosh/pubsub"
21 "github.com/picosh/send/send/utils"
22 )
23@@ -80,11 +79,18 @@ for bidirectional messages to be sent between any clients connected
24 to a pipe.`
25
26 type CliHandler struct {
27- DBPool db.DB
28- Logger *slog.Logger
29- Storage storage.StorageServe
30- RegistryUrl string
31- PubSub *psub.Cfg
32+ DBPool db.DB
33+ Logger *slog.Logger
34+ PubSub *psub.Cfg
35+ Cfg *shared.ConfigSite
36+}
37+
38+func toSshCmd(cfg *shared.ConfigSite) string {
39+ port := "22"
40+ if cfg.Port != "" {
41+ port = fmt.Sprintf("-p %s", cfg.Port)
42+ }
43+ return fmt.Sprintf("%s %s", port, cfg.Domain)
44 }
45
46 func WishMiddleware(handler *CliHandler) wish.Middleware {
47@@ -112,69 +118,69 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
48 }
49
50 cmd := strings.TrimSpace(args[0])
51- if len(args) == 1 {
52- if cmd == "help" {
53- wish.Println(sesh, helpStr)
54- } else if cmd == "ls" {
55- channelFilter := fmt.Sprintf("%s/", user.Name)
56- if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
57- channelFilter = ""
58- }
59+ if cmd == "help" {
60+ wish.Println(sesh, helpStr)
61+ } else if cmd == "ls" {
62+ channelFilter := fmt.Sprintf("%s/", user.Name)
63+ if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
64+ channelFilter = ""
65+ }
66
67- channels := pubsub.PubSub.GetChannels(channelFilter)
68- pipes := pubsub.PubSub.GetPipes(channelFilter)
69-
70- if len(channels) == 0 && len(pipes) == 0 {
71- wish.Println(sesh, "no pubsub channels or pipes found")
72- } else {
73- var outputData string
74- if len(channels) > 0 {
75- outputData += "Channel Information\r\n"
76- for _, channel := range channels {
77- outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
78- outputData += "\tPubs:\r\n"
79-
80- channel.Pubs.Range(func(I string, J *psub.Pub) bool {
81- outputData += fmt.Sprintf("\t- %s:\r\n", I)
82- return true
83- })
84-
85- outputData += "\tSubs:\r\n"
86-
87- channel.Subs.Range(func(I string, J *psub.Sub) bool {
88- outputData += fmt.Sprintf("\t- %s:\r\n", I)
89- return true
90- })
91- }
92- }
93+ channels := pubsub.PubSub.GetChannels(channelFilter)
94+ pipes := pubsub.PubSub.GetPipes(channelFilter)
95
96- if len(pipes) > 0 {
97- outputData += "Pipe Information\r\n"
98- for _, pipe := range pipes {
99- outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
100- outputData += "\tClients:\r\n"
101-
102- pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
103- outputData += fmt.Sprintf("\t- %s:\r\n", I)
104- return true
105- })
106- }
107+ if len(channels) == 0 && len(pipes) == 0 {
108+ wish.Println(sesh, "no pubsub channels or pipes found")
109+ } else {
110+ var outputData string
111+ if len(channels) > 0 {
112+ outputData += "Channel Information\r\n"
113+ for _, channel := range channels {
114+ outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
115+ outputData += "\tPubs:\r\n"
116+
117+ channel.Pubs.Range(func(I string, J *psub.Pub) bool {
118+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
119+ return true
120+ })
121+
122+ outputData += "\tSubs:\r\n"
123+
124+ channel.Subs.Range(func(I string, J *psub.Sub) bool {
125+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
126+ return true
127+ })
128 }
129+ }
130
131- _, _ = sesh.Write([]byte(outputData))
132+ if len(pipes) > 0 {
133+ outputData += "Pipe Information\r\n"
134+ for _, pipe := range pipes {
135+ outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
136+ outputData += "\tClients:\r\n"
137+
138+ pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
139+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
140+ return true
141+ })
142+ }
143 }
144+
145+ _, _ = sesh.Write([]byte(outputData))
146 }
147- next(sesh)
148- return
149 }
150
151- repoName := strings.TrimSpace(args[1])
152- cmdArgs := args[2:]
153+ channelName := ""
154+ cmdArgs := args[1:]
155+ if len(args) > 1 {
156+ channelName = strings.TrimSpace(args[1])
157+ cmdArgs = args[2:]
158+ }
159 logger.Info(
160 "imgs middleware detected command",
161 "args", args,
162 "cmd", cmd,
163- "repoName", repoName,
164+ "channelName", channelName,
165 "cmdArgs", cmdArgs,
166 )
167
168@@ -183,10 +189,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
169 empty := pubCmd.Bool("e", false, "Send an empty message to subs")
170 public := pubCmd.Bool("p", false, "Anyone can sub to this channel")
171 timeout := pubCmd.Duration("t", -1, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is no timeout.")
172- if !flagCheck(pubCmd, repoName, cmdArgs) {
173+ if !flagCheck(pubCmd, channelName, cmdArgs) {
174 return
175 }
176- channelName := repoName
177
178 var reader io.Reader
179 if *empty {
180@@ -195,10 +200,19 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
181 reader = sesh
182 }
183
184+ if channelName == "" {
185+ channelName = uuid.NewString()
186+ }
187 name := toChannel(user.Name, channelName)
188 if *public {
189 name = toPublicChannel(channelName)
190 }
191+ wish.Printf(
192+ sesh,
193+ "subscribe to this channel:\n\tssh %s sub %s\n",
194+ toSshCmd(handler.Cfg),
195+ channelName,
196+ )
197
198 wish.Println(sesh, "sending msg ...")
199 pub := &psub.Pub{
200@@ -218,12 +232,11 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
201 }
202
203 tt := *timeout
204- str := "no subs found ... waiting"
205- if tt > 0 {
206- str += " " + tt.String()
207- }
208-
209 if count == 0 {
210+ str := "no subs found ... waiting"
211+ if tt > 0 {
212+ str += " " + tt.String()
213+ }
214 wish.Println(sesh, str)
215 }
216
217@@ -251,10 +264,10 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
218 } else if cmd == "sub" {
219 pubCmd := flagSet("pub", sesh)
220 public := pubCmd.Bool("p", false, "Subscribe to a public channel")
221- if !flagCheck(pubCmd, repoName, cmdArgs) {
222+ if !flagCheck(pubCmd, channelName, cmdArgs) {
223 return
224 }
225- channelName := repoName
226+ channelName := channelName
227
228 name := toChannel(user.Name, channelName)
229 if *public {
230@@ -280,15 +293,25 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
231 pipeCmd := flagSet("pipe", sesh)
232 public := pipeCmd.Bool("p", false, "Pipe to a public channel")
233 replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
234- if !flagCheck(pipeCmd, repoName, cmdArgs) {
235+ if !flagCheck(pipeCmd, channelName, cmdArgs) {
236 return
237 }
238- channelName := repoName
239-
240+ isCreator := channelName == ""
241+ if isCreator {
242+ channelName = uuid.NewString()
243+ }
244 name := toChannel(user.Name, channelName)
245 if *public {
246 name = toPublicChannel(channelName)
247 }
248+ if isCreator {
249+ wish.Printf(
250+ sesh,
251+ "subscribe to this channel:\n\tssh %s sub %s\n",
252+ toSshCmd(handler.Cfg),
253+ channelName,
254+ )
255+ }
256
257 pipe := &psub.PipeClient{
258 ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
259diff --git a/pubsub/ssh.go b/pubsub/ssh.go
260index e2abbda..ece9bbf 100644
261--- a/pubsub/ssh.go
262+++ b/pubsub/ssh.go
263@@ -27,6 +27,8 @@ func StartSshServer() {
264 dbh := postgres.NewDB(cfg.DbURL, cfg.Logger)
265 defer dbh.Close()
266
267+ cfg.Port = port
268+
269 pubsub := &psub.Cfg{
270 Logger: logger,
271 PubSub: &psub.PubSubMulticast{
272@@ -40,6 +42,7 @@ func StartSshServer() {
273 Logger: logger,
274 DBPool: dbh,
275 PubSub: pubsub,
276+ Cfg: cfg,
277 }
278
279 sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)
280
281base-commit: bfcbe7d0f5299a459b0f67185424254a1357029b
282--
2832.45.2
284