pico / feat(pubsub): if no channel is provided, randomly gen one #28

open · opened on 2024-09-24T03:35:18Z by erockmKua
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 28
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 28
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 28 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 28
# accept PR
ssh pr.pico.sh pr accept 28
# close PR
ssh pr.pico.sh pr close 28

Logs

erockmKua created pr with ps-47 on 2024-09-24T03:35:18Z

Patchsets

Diff ↕
 pubsub/cli.go | 161 ++++++++++++++++++++++++++++----------------------
 pubsub/ssh.go |   3 +
 2 files changed, 95 insertions(+), 69 deletions(-)
  1From 9e5b2563edc2c3e5b071d325c5ba280127f04251 Mon Sep 17 00:00:00 2001
  2From: Eric Bower <me@erock.io>
  3Date: Mon, 23 Sep 2024 23:34:48 -0400
  4Subject: [PATCH] feat(pubsub): if no channel is provided, randomly gen one
  5
  6---
  7 pubsub/cli.go | 161 ++++++++++++++++++++++++++++----------------------
  8 pubsub/ssh.go |   3 +
  9 2 files changed, 95 insertions(+), 69 deletions(-)
 10
 11diff --git a/pubsub/cli.go b/pubsub/cli.go
 12index 9664803..c04d1bd 100644
 13--- a/pubsub/cli.go
 14+++ b/pubsub/cli.go
 15@@ -15,7 +15,6 @@ import (
 16 	"github.com/google/uuid"
 17 	"github.com/picosh/pico/db"
 18 	"github.com/picosh/pico/shared"
 19-	"github.com/picosh/pico/shared/storage"
 20 	psub "github.com/picosh/pubsub"
 21 	"github.com/picosh/send/send/utils"
 22 )
 23@@ -80,11 +79,18 @@ for bidirectional messages to be sent between any clients connected
 24 to a pipe.`
 25 
 26 type CliHandler struct {
 27-	DBPool      db.DB
 28-	Logger      *slog.Logger
 29-	Storage     storage.StorageServe
 30-	RegistryUrl string
 31-	PubSub      *psub.Cfg
 32+	DBPool db.DB
 33+	Logger *slog.Logger
 34+	PubSub *psub.Cfg
 35+	Cfg    *shared.ConfigSite
 36+}
 37+
 38+func toSshCmd(cfg *shared.ConfigSite) string {
 39+	port := "22"
 40+	if cfg.Port != "" {
 41+		port = fmt.Sprintf("-p %s", cfg.Port)
 42+	}
 43+	return fmt.Sprintf("%s %s", port, cfg.Domain)
 44 }
 45 
 46 func WishMiddleware(handler *CliHandler) wish.Middleware {
 47@@ -112,69 +118,69 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
 48 			}
 49 
 50 			cmd := strings.TrimSpace(args[0])
 51-			if len(args) == 1 {
 52-				if cmd == "help" {
 53-					wish.Println(sesh, helpStr)
 54-				} else if cmd == "ls" {
 55-					channelFilter := fmt.Sprintf("%s/", user.Name)
 56-					if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
 57-						channelFilter = ""
 58-					}
 59+			if cmd == "help" {
 60+				wish.Println(sesh, helpStr)
 61+			} else if cmd == "ls" {
 62+				channelFilter := fmt.Sprintf("%s/", user.Name)
 63+				if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
 64+					channelFilter = ""
 65+				}
 66 
 67-					channels := pubsub.PubSub.GetChannels(channelFilter)
 68-					pipes := pubsub.PubSub.GetPipes(channelFilter)
 69-
 70-					if len(channels) == 0 && len(pipes) == 0 {
 71-						wish.Println(sesh, "no pubsub channels or pipes found")
 72-					} else {
 73-						var outputData string
 74-						if len(channels) > 0 {
 75-							outputData += "Channel Information\r\n"
 76-							for _, channel := range channels {
 77-								outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
 78-								outputData += "\tPubs:\r\n"
 79-
 80-								channel.Pubs.Range(func(I string, J *psub.Pub) bool {
 81-									outputData += fmt.Sprintf("\t- %s:\r\n", I)
 82-									return true
 83-								})
 84-
 85-								outputData += "\tSubs:\r\n"
 86-
 87-								channel.Subs.Range(func(I string, J *psub.Sub) bool {
 88-									outputData += fmt.Sprintf("\t- %s:\r\n", I)
 89-									return true
 90-								})
 91-							}
 92-						}
 93+				channels := pubsub.PubSub.GetChannels(channelFilter)
 94+				pipes := pubsub.PubSub.GetPipes(channelFilter)
 95 
 96-						if len(pipes) > 0 {
 97-							outputData += "Pipe Information\r\n"
 98-							for _, pipe := range pipes {
 99-								outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
100-								outputData += "\tClients:\r\n"
101-
102-								pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
103-									outputData += fmt.Sprintf("\t- %s:\r\n", I)
104-									return true
105-								})
106-							}
107+				if len(channels) == 0 && len(pipes) == 0 {
108+					wish.Println(sesh, "no pubsub channels or pipes found")
109+				} else {
110+					var outputData string
111+					if len(channels) > 0 {
112+						outputData += "Channel Information\r\n"
113+						for _, channel := range channels {
114+							outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
115+							outputData += "\tPubs:\r\n"
116+
117+							channel.Pubs.Range(func(I string, J *psub.Pub) bool {
118+								outputData += fmt.Sprintf("\t- %s:\r\n", I)
119+								return true
120+							})
121+
122+							outputData += "\tSubs:\r\n"
123+
124+							channel.Subs.Range(func(I string, J *psub.Sub) bool {
125+								outputData += fmt.Sprintf("\t- %s:\r\n", I)
126+								return true
127+							})
128 						}
129+					}
130 
131-						_, _ = sesh.Write([]byte(outputData))
132+					if len(pipes) > 0 {
133+						outputData += "Pipe Information\r\n"
134+						for _, pipe := range pipes {
135+							outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
136+							outputData += "\tClients:\r\n"
137+
138+							pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
139+								outputData += fmt.Sprintf("\t- %s:\r\n", I)
140+								return true
141+							})
142+						}
143 					}
144+
145+					_, _ = sesh.Write([]byte(outputData))
146 				}
147-				next(sesh)
148-				return
149 			}
150 
151-			repoName := strings.TrimSpace(args[1])
152-			cmdArgs := args[2:]
153+			channelName := ""
154+			cmdArgs := args[1:]
155+			if len(args) > 1 {
156+				channelName = strings.TrimSpace(args[1])
157+				cmdArgs = args[2:]
158+			}
159 			logger.Info(
160 				"imgs middleware detected command",
161 				"args", args,
162 				"cmd", cmd,
163-				"repoName", repoName,
164+				"channelName", channelName,
165 				"cmdArgs", cmdArgs,
166 			)
167 
168@@ -183,10 +189,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
169 				empty := pubCmd.Bool("e", false, "Send an empty message to subs")
170 				public := pubCmd.Bool("p", false, "Anyone can sub to this channel")
171 				timeout := pubCmd.Duration("t", -1, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is no timeout.")
172-				if !flagCheck(pubCmd, repoName, cmdArgs) {
173+				if !flagCheck(pubCmd, channelName, cmdArgs) {
174 					return
175 				}
176-				channelName := repoName
177 
178 				var reader io.Reader
179 				if *empty {
180@@ -195,10 +200,19 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
181 					reader = sesh
182 				}
183 
184+				if channelName == "" {
185+					channelName = uuid.NewString()
186+				}
187 				name := toChannel(user.Name, channelName)
188 				if *public {
189 					name = toPublicChannel(channelName)
190 				}
191+				wish.Printf(
192+					sesh,
193+					"subscribe to this channel:\n\tssh %s sub %s\n",
194+					toSshCmd(handler.Cfg),
195+					channelName,
196+				)
197 
198 				wish.Println(sesh, "sending msg ...")
199 				pub := &psub.Pub{
200@@ -218,12 +232,11 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
201 				}
202 
203 				tt := *timeout
204-				str := "no subs found ... waiting"
205-				if tt > 0 {
206-					str += " " + tt.String()
207-				}
208-
209 				if count == 0 {
210+					str := "no subs found ... waiting"
211+					if tt > 0 {
212+						str += " " + tt.String()
213+					}
214 					wish.Println(sesh, str)
215 				}
216 
217@@ -251,10 +264,10 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
218 			} else if cmd == "sub" {
219 				pubCmd := flagSet("pub", sesh)
220 				public := pubCmd.Bool("p", false, "Subscribe to a public channel")
221-				if !flagCheck(pubCmd, repoName, cmdArgs) {
222+				if !flagCheck(pubCmd, channelName, cmdArgs) {
223 					return
224 				}
225-				channelName := repoName
226+				channelName := channelName
227 
228 				name := toChannel(user.Name, channelName)
229 				if *public {
230@@ -280,15 +293,25 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
231 				pipeCmd := flagSet("pipe", sesh)
232 				public := pipeCmd.Bool("p", false, "Pipe to a public channel")
233 				replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
234-				if !flagCheck(pipeCmd, repoName, cmdArgs) {
235+				if !flagCheck(pipeCmd, channelName, cmdArgs) {
236 					return
237 				}
238-				channelName := repoName
239-
240+				isCreator := channelName == ""
241+				if isCreator {
242+					channelName = uuid.NewString()
243+				}
244 				name := toChannel(user.Name, channelName)
245 				if *public {
246 					name = toPublicChannel(channelName)
247 				}
248+				if isCreator {
249+					wish.Printf(
250+						sesh,
251+						"subscribe to this channel:\n\tssh %s sub %s\n",
252+						toSshCmd(handler.Cfg),
253+						channelName,
254+					)
255+				}
256 
257 				pipe := &psub.PipeClient{
258 					ID:         fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
259diff --git a/pubsub/ssh.go b/pubsub/ssh.go
260index e2abbda..ece9bbf 100644
261--- a/pubsub/ssh.go
262+++ b/pubsub/ssh.go
263@@ -27,6 +27,8 @@ func StartSshServer() {
264 	dbh := postgres.NewDB(cfg.DbURL, cfg.Logger)
265 	defer dbh.Close()
266 
267+	cfg.Port = port
268+
269 	pubsub := &psub.Cfg{
270 		Logger: logger,
271 		PubSub: &psub.PubSubMulticast{
272@@ -40,6 +42,7 @@ func StartSshServer() {
273 		Logger: logger,
274 		DBPool: dbh,
275 		PubSub: pubsub,
276+		Cfg:    cfg,
277 	}
278 
279 	sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)
280
281base-commit: bfcbe7d0f5299a459b0f67185424254a1357029b
282-- 
2832.45.2
284
ps-47 by erockmKua on 2024-09-24T03:35:18Z

feat(pubsub): if no channel is provided, randomly gen one

Eric Bower <me@erock.io> 2024-09-24T03:34:48Z
 pubsub/cli.go | 161 ++++++++++++++++++++++++++++----------------------
 pubsub/ssh.go |   3 +
 2 files changed, 95 insertions(+), 69 deletions(-)
  1From 9e5b2563edc2c3e5b071d325c5ba280127f04251 Mon Sep 17 00:00:00 2001
  2From: Eric Bower <me@erock.io>
  3Date: Mon, 23 Sep 2024 23:34:48 -0400
  4Subject: [PATCH] feat(pubsub): if no channel is provided, randomly gen one
  5
  6---
  7 pubsub/cli.go | 161 ++++++++++++++++++++++++++++----------------------
  8 pubsub/ssh.go |   3 +
  9 2 files changed, 95 insertions(+), 69 deletions(-)
 10
 11diff --git a/pubsub/cli.go b/pubsub/cli.go
 12index 9664803..c04d1bd 100644
 13--- a/pubsub/cli.go
 14+++ b/pubsub/cli.go
 15@@ -15,7 +15,6 @@ import (
 16 	"github.com/google/uuid"
 17 	"github.com/picosh/pico/db"
 18 	"github.com/picosh/pico/shared"
 19-	"github.com/picosh/pico/shared/storage"
 20 	psub "github.com/picosh/pubsub"
 21 	"github.com/picosh/send/send/utils"
 22 )
 23@@ -80,11 +79,18 @@ for bidirectional messages to be sent between any clients connected
 24 to a pipe.`
 25 
 26 type CliHandler struct {
 27-	DBPool      db.DB
 28-	Logger      *slog.Logger
 29-	Storage     storage.StorageServe
 30-	RegistryUrl string
 31-	PubSub      *psub.Cfg
 32+	DBPool db.DB
 33+	Logger *slog.Logger
 34+	PubSub *psub.Cfg
 35+	Cfg    *shared.ConfigSite
 36+}
 37+
 38+func toSshCmd(cfg *shared.ConfigSite) string {
 39+	port := "22"
 40+	if cfg.Port != "" {
 41+		port = fmt.Sprintf("-p %s", cfg.Port)
 42+	}
 43+	return fmt.Sprintf("%s %s", port, cfg.Domain)
 44 }
 45 
 46 func WishMiddleware(handler *CliHandler) wish.Middleware {
 47@@ -112,69 +118,69 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
 48 			}
 49 
 50 			cmd := strings.TrimSpace(args[0])
 51-			if len(args) == 1 {
 52-				if cmd == "help" {
 53-					wish.Println(sesh, helpStr)
 54-				} else if cmd == "ls" {
 55-					channelFilter := fmt.Sprintf("%s/", user.Name)
 56-					if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
 57-						channelFilter = ""
 58-					}
 59+			if cmd == "help" {
 60+				wish.Println(sesh, helpStr)
 61+			} else if cmd == "ls" {
 62+				channelFilter := fmt.Sprintf("%s/", user.Name)
 63+				if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
 64+					channelFilter = ""
 65+				}
 66 
 67-					channels := pubsub.PubSub.GetChannels(channelFilter)
 68-					pipes := pubsub.PubSub.GetPipes(channelFilter)
 69-
 70-					if len(channels) == 0 && len(pipes) == 0 {
 71-						wish.Println(sesh, "no pubsub channels or pipes found")
 72-					} else {
 73-						var outputData string
 74-						if len(channels) > 0 {
 75-							outputData += "Channel Information\r\n"
 76-							for _, channel := range channels {
 77-								outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
 78-								outputData += "\tPubs:\r\n"
 79-
 80-								channel.Pubs.Range(func(I string, J *psub.Pub) bool {
 81-									outputData += fmt.Sprintf("\t- %s:\r\n", I)
 82-									return true
 83-								})
 84-
 85-								outputData += "\tSubs:\r\n"
 86-
 87-								channel.Subs.Range(func(I string, J *psub.Sub) bool {
 88-									outputData += fmt.Sprintf("\t- %s:\r\n", I)
 89-									return true
 90-								})
 91-							}
 92-						}
 93+				channels := pubsub.PubSub.GetChannels(channelFilter)
 94+				pipes := pubsub.PubSub.GetPipes(channelFilter)
 95 
 96-						if len(pipes) > 0 {
 97-							outputData += "Pipe Information\r\n"
 98-							for _, pipe := range pipes {
 99-								outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
100-								outputData += "\tClients:\r\n"
101-
102-								pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
103-									outputData += fmt.Sprintf("\t- %s:\r\n", I)
104-									return true
105-								})
106-							}
107+				if len(channels) == 0 && len(pipes) == 0 {
108+					wish.Println(sesh, "no pubsub channels or pipes found")
109+				} else {
110+					var outputData string
111+					if len(channels) > 0 {
112+						outputData += "Channel Information\r\n"
113+						for _, channel := range channels {
114+							outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
115+							outputData += "\tPubs:\r\n"
116+
117+							channel.Pubs.Range(func(I string, J *psub.Pub) bool {
118+								outputData += fmt.Sprintf("\t- %s:\r\n", I)
119+								return true
120+							})
121+
122+							outputData += "\tSubs:\r\n"
123+
124+							channel.Subs.Range(func(I string, J *psub.Sub) bool {
125+								outputData += fmt.Sprintf("\t- %s:\r\n", I)
126+								return true
127+							})
128 						}
129+					}
130 
131-						_, _ = sesh.Write([]byte(outputData))
132+					if len(pipes) > 0 {
133+						outputData += "Pipe Information\r\n"
134+						for _, pipe := range pipes {
135+							outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
136+							outputData += "\tClients:\r\n"
137+
138+							pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
139+								outputData += fmt.Sprintf("\t- %s:\r\n", I)
140+								return true
141+							})
142+						}
143 					}
144+
145+					_, _ = sesh.Write([]byte(outputData))
146 				}
147-				next(sesh)
148-				return
149 			}
150 
151-			repoName := strings.TrimSpace(args[1])
152-			cmdArgs := args[2:]
153+			channelName := ""
154+			cmdArgs := args[1:]
155+			if len(args) > 1 {
156+				channelName = strings.TrimSpace(args[1])
157+				cmdArgs = args[2:]
158+			}
159 			logger.Info(
160 				"imgs middleware detected command",
161 				"args", args,
162 				"cmd", cmd,
163-				"repoName", repoName,
164+				"channelName", channelName,
165 				"cmdArgs", cmdArgs,
166 			)
167 
168@@ -183,10 +189,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
169 				empty := pubCmd.Bool("e", false, "Send an empty message to subs")
170 				public := pubCmd.Bool("p", false, "Anyone can sub to this channel")
171 				timeout := pubCmd.Duration("t", -1, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is no timeout.")
172-				if !flagCheck(pubCmd, repoName, cmdArgs) {
173+				if !flagCheck(pubCmd, channelName, cmdArgs) {
174 					return
175 				}
176-				channelName := repoName
177 
178 				var reader io.Reader
179 				if *empty {
180@@ -195,10 +200,19 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
181 					reader = sesh
182 				}
183 
184+				if channelName == "" {
185+					channelName = uuid.NewString()
186+				}
187 				name := toChannel(user.Name, channelName)
188 				if *public {
189 					name = toPublicChannel(channelName)
190 				}
191+				wish.Printf(
192+					sesh,
193+					"subscribe to this channel:\n\tssh %s sub %s\n",
194+					toSshCmd(handler.Cfg),
195+					channelName,
196+				)
197 
198 				wish.Println(sesh, "sending msg ...")
199 				pub := &psub.Pub{
200@@ -218,12 +232,11 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
201 				}
202 
203 				tt := *timeout
204-				str := "no subs found ... waiting"
205-				if tt > 0 {
206-					str += " " + tt.String()
207-				}
208-
209 				if count == 0 {
210+					str := "no subs found ... waiting"
211+					if tt > 0 {
212+						str += " " + tt.String()
213+					}
214 					wish.Println(sesh, str)
215 				}
216 
217@@ -251,10 +264,10 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
218 			} else if cmd == "sub" {
219 				pubCmd := flagSet("pub", sesh)
220 				public := pubCmd.Bool("p", false, "Subscribe to a public channel")
221-				if !flagCheck(pubCmd, repoName, cmdArgs) {
222+				if !flagCheck(pubCmd, channelName, cmdArgs) {
223 					return
224 				}
225-				channelName := repoName
226+				channelName := channelName
227 
228 				name := toChannel(user.Name, channelName)
229 				if *public {
230@@ -280,15 +293,25 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
231 				pipeCmd := flagSet("pipe", sesh)
232 				public := pipeCmd.Bool("p", false, "Pipe to a public channel")
233 				replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
234-				if !flagCheck(pipeCmd, repoName, cmdArgs) {
235+				if !flagCheck(pipeCmd, channelName, cmdArgs) {
236 					return
237 				}
238-				channelName := repoName
239-
240+				isCreator := channelName == ""
241+				if isCreator {
242+					channelName = uuid.NewString()
243+				}
244 				name := toChannel(user.Name, channelName)
245 				if *public {
246 					name = toPublicChannel(channelName)
247 				}
248+				if isCreator {
249+					wish.Printf(
250+						sesh,
251+						"subscribe to this channel:\n\tssh %s sub %s\n",
252+						toSshCmd(handler.Cfg),
253+						channelName,
254+					)
255+				}
256 
257 				pipe := &psub.PipeClient{
258 					ID:         fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
259diff --git a/pubsub/ssh.go b/pubsub/ssh.go
260index e2abbda..ece9bbf 100644
261--- a/pubsub/ssh.go
262+++ b/pubsub/ssh.go
263@@ -27,6 +27,8 @@ func StartSshServer() {
264 	dbh := postgres.NewDB(cfg.DbURL, cfg.Logger)
265 	defer dbh.Close()
266 
267+	cfg.Port = port
268+
269 	pubsub := &psub.Cfg{
270 		Logger: logger,
271 		PubSub: &psub.PubSubMulticast{
272@@ -40,6 +42,7 @@ func StartSshServer() {
273 		Logger: logger,
274 		DBPool: dbh,
275 		PubSub: pubsub,
276+		Cfg:    cfg,
277 	}
278 
279 	sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)
280
281base-commit: bfcbe7d0f5299a459b0f67185424254a1357029b
282-- 
2832.45.2
284