Logs
Patchsets
Patchset ps-65
refactor: remote client writer that implement io.Writer
Eric Bower
log/log.go
+6
-176
remote_client.go
+174
-1
refactor: remote client writer that implement io.Writer
It also acts as a supervisor, keeping the ssh connection alive
log/log.go
link
+6
-176
+6
-176
1diff --git a/log/log.go b/log/log.go
2index 98bfddc..cf87c7e 100644
3--- a/log/log.go
4+++ b/log/log.go
5@@ -3,15 +3,12 @@ package log
6 import (
7 "context"
8 "errors"
9- "fmt"
10 "io"
11 "log/slog"
12 "slices"
13 "sync"
14- "time"
15
16 "github.com/picosh/pubsub"
17- "golang.org/x/crypto/ssh"
18 )
19
20 type MultiHandler struct {
21@@ -19,6 +16,8 @@ type MultiHandler struct {
22 mu sync.Mutex
23 }
24
25+var _ slog.Handler = (*MultiHandler)(nil)
26+
27 func (m *MultiHandler) Enabled(ctx context.Context, l slog.Level) bool {
28 m.mu.Lock()
29 defer m.mu.Unlock()
30@@ -80,181 +79,15 @@ func (m *MultiHandler) WithGroup(name string) slog.Handler {
31 }
32 }
33
34-type PubSubLogWriter struct {
35- SSHClient *ssh.Client
36- Session *ssh.Session
37- StdinPipe io.WriteCloser
38- Done chan struct{}
39- Messages chan []byte
40- Timeout time.Duration
41- BufferSize int
42- closeOnce sync.Once
43- closeMessageOnce sync.Once
44- startOnce sync.Once
45- connecMu sync.Mutex
46- ConnectionInfo *pubsub.RemoteClientInfo
47-}
48-
49-func (c *PubSubLogWriter) Close() error {
50- c.connecMu.Lock()
51- defer c.connecMu.Unlock()
52-
53- if c.Done != nil {
54- c.closeOnce.Do(func() {
55- close(c.Done)
56- })
57- }
58-
59- if c.Messages != nil {
60- c.closeMessageOnce.Do(func() {
61- close(c.Messages)
62- })
63- }
64-
65- var errs []error
66-
67- if c.StdinPipe != nil {
68- errs = append(errs, c.StdinPipe.Close())
69- }
70-
71- if c.Session != nil {
72- errs = append(errs, c.Session.Close())
73- }
74-
75- if c.SSHClient != nil {
76- errs = append(errs, c.SSHClient.Close())
77- }
78-
79- return errors.Join(errs...)
80-}
81-
82-func (c *PubSubLogWriter) Open() error {
83- c.Close()
84-
85- c.connecMu.Lock()
86-
87- c.Done = make(chan struct{})
88- c.Messages = make(chan []byte, c.BufferSize)
89-
90- sshClient, err := pubsub.CreateRemoteClient(c.ConnectionInfo)
91- if err != nil {
92- c.connecMu.Unlock()
93- return err
94- }
95-
96- session, err := sshClient.NewSession()
97- if err != nil {
98- c.connecMu.Unlock()
99- return err
100- }
101-
102- stdinPipe, err := session.StdinPipe()
103- if err != nil {
104- c.connecMu.Unlock()
105- return err
106- }
107-
108- err = session.Start("pub log-drain -b=false")
109- if err != nil {
110- c.connecMu.Unlock()
111- return err
112- }
113-
114- c.SSHClient = sshClient
115- c.Session = session
116- c.StdinPipe = stdinPipe
117-
118- c.closeOnce = sync.Once{}
119- c.startOnce = sync.Once{}
120-
121- c.connecMu.Unlock()
122-
123- c.Start()
124-
125- return nil
126-}
127-
128-func (c *PubSubLogWriter) Start() {
129- c.startOnce.Do(func() {
130- go func() {
131- defer c.Reconnect()
132-
133- for {
134- select {
135- case data, ok := <-c.Messages:
136- _, err := c.StdinPipe.Write(data)
137- if !ok || err != nil {
138- slog.Error("received error on write, reopening logger", "error", err)
139- return
140- }
141- case <-c.Done:
142- return
143- }
144- }
145- }()
146- })
147-}
148-
149-func (c *PubSubLogWriter) Write(data []byte) (int, error) {
150- var (
151- n int
152- err error
153- )
154-
155- ok := c.connecMu.TryLock()
156-
157- if !ok {
158- return n, fmt.Errorf("unable to acquire lock to write")
159- }
160-
161- defer c.connecMu.Unlock()
162-
163- if c.Messages == nil || c.Done == nil {
164- return n, fmt.Errorf("logger not viable")
165- }
166-
167- select {
168- case c.Messages <- slices.Clone(data):
169- n = len(data)
170- case <-time.After(c.Timeout):
171- err = fmt.Errorf("unable to send data within timeout")
172- case <-c.Done:
173- break
174- }
175-
176- return n, err
177-}
178-
179-func (c *PubSubLogWriter) Reconnect() {
180- go func() {
181- for {
182- err := c.Open()
183- if err != nil {
184- slog.Error("unable to open send logger. retrying in 10 seconds", "error", err)
185- } else {
186- return
187- }
188-
189- <-time.After(10 * time.Second)
190- }
191- }()
192-}
193-
194-func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInfo, buffer int) (*slog.Logger, error) {
195+func SendLogRegister(logger *slog.Logger, info *pubsub.RemoteClientInfo, buffer int) (*slog.Logger, error) {
196 if buffer < 0 {
197 buffer = 0
198 }
199
200- currentHandler := logger.Handler()
201-
202- logWriter := &PubSubLogWriter{
203- Timeout: 10 * time.Millisecond,
204- BufferSize: buffer,
205- ConnectionInfo: connectionInfo,
206- }
207-
208- logWriter.Reconnect()
209+ logWriter := pubsub.NewRemoteClientWriter(info, logger, buffer)
210+ go logWriter.KeepAlive("pub log-drain -b=false")
211
212+ currentHandler := logger.Handler()
213 return slog.New(
214 &MultiHandler{
215 Handlers: []slog.Handler{
216@@ -268,9 +101,6 @@ func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInf
217 ), nil
218 }
219
220-var _ io.Writer = (*PubSubLogWriter)(nil)
221-var _ slog.Handler = (*MultiHandler)(nil)
222-
223 func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
224 return pubsub.RemoteSub("sub log-drain -k", ctx, connectionInfo)
225 }
remote_client.go
link
+174
-1
+174
-1
1diff --git a/remote_client.go b/remote_client.go
2index 84c48f4..57345f1 100644
3--- a/remote_client.go
4+++ b/remote_client.go
5@@ -2,16 +2,189 @@ package pubsub
6
7 import (
8 "context"
9+ "errors"
10 "fmt"
11 "io"
12+ "log/slog"
13 "net"
14 "os"
15 "path/filepath"
16+ "slices"
17 "strings"
18+ "sync"
19+ "time"
20
21 "golang.org/x/crypto/ssh"
22 )
23
24+type RemoteClientWriter struct {
25+ SSHClient *ssh.Client
26+ Session *ssh.Session
27+ StdinPipe io.WriteCloser
28+ Done chan struct{}
29+ Messages chan []byte
30+ Timeout time.Duration
31+ BufferSize int
32+ closeOnce sync.Once
33+ closeMessageOnce sync.Once
34+ startOnce sync.Once
35+ connecMu sync.Mutex
36+ Info *RemoteClientInfo
37+ Logger *slog.Logger
38+}
39+
40+var _ io.Writer = (*RemoteClientWriter)(nil)
41+
42+func NewRemoteClientWriter(info *RemoteClientInfo, logger *slog.Logger, buffer int) *RemoteClientWriter {
43+ return &RemoteClientWriter{
44+ Timeout: 10 * time.Millisecond,
45+ Info: info,
46+ BufferSize: buffer,
47+ Logger: logger,
48+ }
49+}
50+
51+func (c *RemoteClientWriter) Close() error {
52+ c.connecMu.Lock()
53+ defer c.connecMu.Unlock()
54+
55+ if c.Done != nil {
56+ c.closeOnce.Do(func() {
57+ close(c.Done)
58+ })
59+ }
60+
61+ if c.Messages != nil {
62+ c.closeMessageOnce.Do(func() {
63+ close(c.Messages)
64+ })
65+ }
66+
67+ var errs []error
68+
69+ if c.StdinPipe != nil {
70+ errs = append(errs, c.StdinPipe.Close())
71+ }
72+
73+ if c.Session != nil {
74+ errs = append(errs, c.Session.Close())
75+ }
76+
77+ if c.SSHClient != nil {
78+ errs = append(errs, c.SSHClient.Close())
79+ }
80+
81+ return errors.Join(errs...)
82+}
83+
84+func (c *RemoteClientWriter) Open(cmd string) error {
85+ c.Close()
86+
87+ c.connecMu.Lock()
88+
89+ c.Done = make(chan struct{})
90+ c.Messages = make(chan []byte, c.BufferSize)
91+
92+ sshClient, err := CreateRemoteClient(c.Info)
93+ if err != nil {
94+ c.connecMu.Unlock()
95+ return err
96+ }
97+
98+ session, err := sshClient.NewSession()
99+ if err != nil {
100+ c.connecMu.Unlock()
101+ return err
102+ }
103+
104+ stdinPipe, err := session.StdinPipe()
105+ if err != nil {
106+ c.connecMu.Unlock()
107+ return err
108+ }
109+
110+ err = session.Start(cmd)
111+ if err != nil {
112+ c.connecMu.Unlock()
113+ return err
114+ }
115+
116+ c.SSHClient = sshClient
117+ c.Session = session
118+ c.StdinPipe = stdinPipe
119+
120+ c.closeOnce = sync.Once{}
121+ c.startOnce = sync.Once{}
122+
123+ c.connecMu.Unlock()
124+
125+ c.start()
126+
127+ return nil
128+}
129+
130+func (c *RemoteClientWriter) start() {
131+ c.startOnce.Do(func() {
132+ go func() {
133+ for {
134+ select {
135+ case data, ok := <-c.Messages:
136+ _, err := c.StdinPipe.Write(data)
137+ if !ok || err != nil {
138+ c.Logger.Error("received error on write, reopening conn", "error", err)
139+ return
140+ }
141+ case <-c.Done:
142+ return
143+ }
144+ }
145+ }()
146+ })
147+}
148+
149+func (c *RemoteClientWriter) Write(data []byte) (int, error) {
150+ var (
151+ n int
152+ err error
153+ )
154+
155+ ok := c.connecMu.TryLock()
156+
157+ if !ok {
158+ return n, fmt.Errorf("unable to acquire lock to write")
159+ }
160+
161+ defer c.connecMu.Unlock()
162+
163+ if c.Messages == nil || c.Done == nil {
164+ return n, fmt.Errorf("conn not viable")
165+ }
166+
167+ select {
168+ case c.Messages <- slices.Clone(data):
169+ n = len(data)
170+ case <-time.After(c.Timeout):
171+ err = fmt.Errorf("unable to send data within timeout")
172+ case <-c.Done:
173+ break
174+ }
175+
176+ return n, err
177+}
178+
179+func (c *RemoteClientWriter) KeepAlive(cmd string) {
180+ for {
181+ err := c.Open(cmd)
182+ if err != nil {
183+ c.Logger.Error("unable to open send to ssh conn. retrying in 10 seconds", "error", err)
184+ } else {
185+ return
186+ }
187+
188+ <-time.After(10 * time.Second)
189+ }
190+}
191+
192 type RemoteClientInfo struct {
193 RemoteHost string
194 KeyLocation string
195@@ -22,7 +195,7 @@ type RemoteClientInfo struct {
196
197 func CreateRemoteClient(info *RemoteClientInfo) (*ssh.Client, error) {
198 if info == nil {
199- return nil, fmt.Errorf("connection info is invalid")
200+ return nil, fmt.Errorf("conn info is invalid")
201 }
202
203 if !strings.Contains(info.RemoteHost, ":") {