1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
diff --git a/pkg/apps/pipe/api.go b/pkg/apps/pipe/api.go
index 865e6ca..e8f1685 100644
--- a/pkg/apps/pipe/api.go
+++ b/pkg/apps/pipe/api.go
@@ -2,6 +2,7 @@ package pipe
import (
"bufio"
+ "bytes"
"context"
"errors"
"fmt"
@@ -158,6 +159,8 @@ func handlePub(pubsub bool) http.HandlerFunc {
params += fmt.Sprintf(" -a=%s", cleanList)
}
+ prefix := r.URL.Query().Get("prefix")
+
var wg sync.WaitGroup
reader := bufio.NewReaderSize(r.Body, 1)
@@ -245,7 +248,12 @@ func handlePub(pubsub bool) http.HandlerFunc {
case <-r.Context().Done():
break outer
default:
- n, err := p.Write(first)
+ messageToWrite := first
+ if prefix != "" {
+ messageToWrite = append([]byte(prefix), messageToWrite...)
+ }
+
+ n, err := p.Write(messageToWrite)
if err != nil {
logger.Error("pub write error", "topic", topic, "info", clientInfo, "err", err.Error())
http.Error(w, "server error", http.StatusInternalServerError)
@@ -314,6 +322,8 @@ func handlePipe() http.HandlerFunc {
params += fmt.Sprintf(" -a=%s", cleanList)
}
+ prefix := r.URL.Query().Get("prefix")
+
id := uuid.NewString()
p, err := sshClient.AddSession(id, fmt.Sprintf("pipe %s %s", params, topic), 0, -1, -1)
@@ -364,6 +374,8 @@ func handlePipe() http.HandlerFunc {
wg.Done()
}()
+ var messageBuffer []byte
+
for {
buf := make([]byte, 32*1024)
@@ -373,12 +385,46 @@ func handlePipe() http.HandlerFunc {
break
}
- buf = buf[:n]
-
- err = c.WriteMessage(messageType, buf)
- if err != nil {
- logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
- break
+ messageBuffer = append(messageBuffer, buf[:n]...)
+
+ if prefix != "" {
+ // Buffer and split on prefix boundaries
+ for {
+ firstIdx := bytes.Index(messageBuffer, []byte(prefix))
+ if firstIdx == -1 {
+ // No prefix found, clear buffer (shouldn't happen in normal use)
+ messageBuffer = nil
+ break
+ }
+
+ // Look for next prefix after the first one
+ secondIdx := bytes.Index(messageBuffer[firstIdx+len(prefix):], []byte(prefix))
+ if secondIdx == -1 {
+ // No complete message yet, keep buffer as is
+ break
+ }
+
+ // We have a complete message, extract and send it
+ messageToSend := messageBuffer[firstIdx : firstIdx+len(prefix)+secondIdx]
+ err = c.WriteMessage(messageType, messageToSend)
+ if err != nil {
+ logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
+ break
+ }
+
+ // Update buffer to remove sent message
+ messageBuffer = messageBuffer[firstIdx+len(prefix)+secondIdx:]
+ }
+ } else {
+ // No prefix set, send all data as-is
+ if len(messageBuffer) > 0 {
+ err = c.WriteMessage(messageType, messageBuffer)
+ if err != nil {
+ logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
+ break
+ }
+ messageBuffer = nil
+ }
}
}
}()
|