dashboard / erock/pico / feat(pipe): ?prefix= query param for pub and pipe #104 rss

open · opened on 2026-01-22T23:52:32Z by erock
Help
checkout latest patchset:
ssh pr.pico.sh print pr-104 | git am -3
checkout any patchset in a patch request:
ssh pr.pico.sh print ps-X | git am -3
add changes to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add 104
add review to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add --review 104
accept PR:
ssh pr.pico.sh pr accept 104
close PR:
ssh pr.pico.sh pr close 104
Timeline Patchsets

Patchset ps-190

Back to top
+53 -7 pkg/apps/pipe/api.go link
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
diff --git a/pkg/apps/pipe/api.go b/pkg/apps/pipe/api.go
index 865e6ca..e8f1685 100644
--- a/pkg/apps/pipe/api.go
+++ b/pkg/apps/pipe/api.go
@@ -2,6 +2,7 @@ package pipe
 
 import (
 	"bufio"
+	"bytes"
 	"context"
 	"errors"
 	"fmt"
@@ -158,6 +159,8 @@ func handlePub(pubsub bool) http.HandlerFunc {
 			params += fmt.Sprintf(" -a=%s", cleanList)
 		}
 
+		prefix := r.URL.Query().Get("prefix")
+
 		var wg sync.WaitGroup
 
 		reader := bufio.NewReaderSize(r.Body, 1)
@@ -245,7 +248,12 @@ func handlePub(pubsub bool) http.HandlerFunc {
 			case <-r.Context().Done():
 				break outer
 			default:
-				n, err := p.Write(first)
+				messageToWrite := first
+				if prefix != "" {
+					messageToWrite = append([]byte(prefix), messageToWrite...)
+				}
+
+				n, err := p.Write(messageToWrite)
 				if err != nil {
 					logger.Error("pub write error", "topic", topic, "info", clientInfo, "err", err.Error())
 					http.Error(w, "server error", http.StatusInternalServerError)
@@ -314,6 +322,8 @@ func handlePipe() http.HandlerFunc {
 			params += fmt.Sprintf(" -a=%s", cleanList)
 		}
 
+		prefix := r.URL.Query().Get("prefix")
+
 		id := uuid.NewString()
 
 		p, err := sshClient.AddSession(id, fmt.Sprintf("pipe %s %s", params, topic), 0, -1, -1)
@@ -364,6 +374,8 @@ func handlePipe() http.HandlerFunc {
 				wg.Done()
 			}()
 
+			var messageBuffer []byte
+
 			for {
 				buf := make([]byte, 32*1024)
 
@@ -373,12 +385,46 @@ func handlePipe() http.HandlerFunc {
 					break
 				}
 
-				buf = buf[:n]
-
-				err = c.WriteMessage(messageType, buf)
-				if err != nil {
-					logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
-					break
+				messageBuffer = append(messageBuffer, buf[:n]...)
+
+				if prefix != "" {
+					// Buffer and split on prefix boundaries
+					for {
+						firstIdx := bytes.Index(messageBuffer, []byte(prefix))
+						if firstIdx == -1 {
+							// No prefix found, clear buffer (shouldn't happen in normal use)
+							messageBuffer = nil
+							break
+						}
+
+						// Look for next prefix after the first one
+						secondIdx := bytes.Index(messageBuffer[firstIdx+len(prefix):], []byte(prefix))
+						if secondIdx == -1 {
+							// No complete message yet, keep buffer as is
+							break
+						}
+
+						// We have a complete message, extract and send it
+						messageToSend := messageBuffer[firstIdx : firstIdx+len(prefix)+secondIdx]
+						err = c.WriteMessage(messageType, messageToSend)
+						if err != nil {
+							logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
+							break
+						}
+
+						// Update buffer to remove sent message
+						messageBuffer = messageBuffer[firstIdx+len(prefix)+secondIdx:]
+					}
+				} else {
+					// No prefix set, send all data as-is
+					if len(messageBuffer) > 0 {
+						err = c.WriteMessage(messageType, messageBuffer)
+						if err != nil {
+							logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
+							break
+						}
+						messageBuffer = nil
+					}
 				}
 			}
 		}()