dashboard / pubsub / refactor: remote client writer that implement io.Writer #31 rss

accepted · opened on 2024-11-12T20:57:56Z by erock
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 31
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 31
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 31 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 31
# accept PR
ssh pr.pico.sh pr accept 31
# close PR
ssh pr.pico.sh pr close 31

Logs

erock created pr with ps-65 on 2024-11-12T20:57:56Z
erock changed status on 2024-11-14T15:44:45Z {"status":"accepted"}

Patchsets

ps-65 by erock on 2024-11-12T20:57:56Z

Patchset ps-65

Back to top

refactor: remote client writer that implement io.Writer

It also acts as a supervisor, keeping the ssh connection alive
log/log.go link
+6 -176
  1diff --git a/log/log.go b/log/log.go
  2index 98bfddc..cf87c7e 100644
  3--- a/log/log.go
  4+++ b/log/log.go
  5@@ -3,15 +3,12 @@ package log
  6 import (
  7 	"context"
  8 	"errors"
  9-	"fmt"
 10 	"io"
 11 	"log/slog"
 12 	"slices"
 13 	"sync"
 14-	"time"
 15 
 16 	"github.com/picosh/pubsub"
 17-	"golang.org/x/crypto/ssh"
 18 )
 19 
 20 type MultiHandler struct {
 21@@ -19,6 +16,8 @@ type MultiHandler struct {
 22 	mu       sync.Mutex
 23 }
 24 
 25+var _ slog.Handler = (*MultiHandler)(nil)
 26+
 27 func (m *MultiHandler) Enabled(ctx context.Context, l slog.Level) bool {
 28 	m.mu.Lock()
 29 	defer m.mu.Unlock()
 30@@ -80,181 +79,15 @@ func (m *MultiHandler) WithGroup(name string) slog.Handler {
 31 	}
 32 }
 33 
 34-type PubSubLogWriter struct {
 35-	SSHClient        *ssh.Client
 36-	Session          *ssh.Session
 37-	StdinPipe        io.WriteCloser
 38-	Done             chan struct{}
 39-	Messages         chan []byte
 40-	Timeout          time.Duration
 41-	BufferSize       int
 42-	closeOnce        sync.Once
 43-	closeMessageOnce sync.Once
 44-	startOnce        sync.Once
 45-	connecMu         sync.Mutex
 46-	ConnectionInfo   *pubsub.RemoteClientInfo
 47-}
 48-
 49-func (c *PubSubLogWriter) Close() error {
 50-	c.connecMu.Lock()
 51-	defer c.connecMu.Unlock()
 52-
 53-	if c.Done != nil {
 54-		c.closeOnce.Do(func() {
 55-			close(c.Done)
 56-		})
 57-	}
 58-
 59-	if c.Messages != nil {
 60-		c.closeMessageOnce.Do(func() {
 61-			close(c.Messages)
 62-		})
 63-	}
 64-
 65-	var errs []error
 66-
 67-	if c.StdinPipe != nil {
 68-		errs = append(errs, c.StdinPipe.Close())
 69-	}
 70-
 71-	if c.Session != nil {
 72-		errs = append(errs, c.Session.Close())
 73-	}
 74-
 75-	if c.SSHClient != nil {
 76-		errs = append(errs, c.SSHClient.Close())
 77-	}
 78-
 79-	return errors.Join(errs...)
 80-}
 81-
 82-func (c *PubSubLogWriter) Open() error {
 83-	c.Close()
 84-
 85-	c.connecMu.Lock()
 86-
 87-	c.Done = make(chan struct{})
 88-	c.Messages = make(chan []byte, c.BufferSize)
 89-
 90-	sshClient, err := pubsub.CreateRemoteClient(c.ConnectionInfo)
 91-	if err != nil {
 92-		c.connecMu.Unlock()
 93-		return err
 94-	}
 95-
 96-	session, err := sshClient.NewSession()
 97-	if err != nil {
 98-		c.connecMu.Unlock()
 99-		return err
100-	}
101-
102-	stdinPipe, err := session.StdinPipe()
103-	if err != nil {
104-		c.connecMu.Unlock()
105-		return err
106-	}
107-
108-	err = session.Start("pub log-drain -b=false")
109-	if err != nil {
110-		c.connecMu.Unlock()
111-		return err
112-	}
113-
114-	c.SSHClient = sshClient
115-	c.Session = session
116-	c.StdinPipe = stdinPipe
117-
118-	c.closeOnce = sync.Once{}
119-	c.startOnce = sync.Once{}
120-
121-	c.connecMu.Unlock()
122-
123-	c.Start()
124-
125-	return nil
126-}
127-
128-func (c *PubSubLogWriter) Start() {
129-	c.startOnce.Do(func() {
130-		go func() {
131-			defer c.Reconnect()
132-
133-			for {
134-				select {
135-				case data, ok := <-c.Messages:
136-					_, err := c.StdinPipe.Write(data)
137-					if !ok || err != nil {
138-						slog.Error("received error on write, reopening logger", "error", err)
139-						return
140-					}
141-				case <-c.Done:
142-					return
143-				}
144-			}
145-		}()
146-	})
147-}
148-
149-func (c *PubSubLogWriter) Write(data []byte) (int, error) {
150-	var (
151-		n   int
152-		err error
153-	)
154-
155-	ok := c.connecMu.TryLock()
156-
157-	if !ok {
158-		return n, fmt.Errorf("unable to acquire lock to write")
159-	}
160-
161-	defer c.connecMu.Unlock()
162-
163-	if c.Messages == nil || c.Done == nil {
164-		return n, fmt.Errorf("logger not viable")
165-	}
166-
167-	select {
168-	case c.Messages <- slices.Clone(data):
169-		n = len(data)
170-	case <-time.After(c.Timeout):
171-		err = fmt.Errorf("unable to send data within timeout")
172-	case <-c.Done:
173-		break
174-	}
175-
176-	return n, err
177-}
178-
179-func (c *PubSubLogWriter) Reconnect() {
180-	go func() {
181-		for {
182-			err := c.Open()
183-			if err != nil {
184-				slog.Error("unable to open send logger. retrying in 10 seconds", "error", err)
185-			} else {
186-				return
187-			}
188-
189-			<-time.After(10 * time.Second)
190-		}
191-	}()
192-}
193-
194-func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInfo, buffer int) (*slog.Logger, error) {
195+func SendLogRegister(logger *slog.Logger, info *pubsub.RemoteClientInfo, buffer int) (*slog.Logger, error) {
196 	if buffer < 0 {
197 		buffer = 0
198 	}
199 
200-	currentHandler := logger.Handler()
201-
202-	logWriter := &PubSubLogWriter{
203-		Timeout:        10 * time.Millisecond,
204-		BufferSize:     buffer,
205-		ConnectionInfo: connectionInfo,
206-	}
207-
208-	logWriter.Reconnect()
209+	logWriter := pubsub.NewRemoteClientWriter(info, logger, buffer)
210+	go logWriter.KeepAlive("pub log-drain -b=false")
211 
212+	currentHandler := logger.Handler()
213 	return slog.New(
214 		&MultiHandler{
215 			Handlers: []slog.Handler{
216@@ -268,9 +101,6 @@ func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInf
217 	), nil
218 }
219 
220-var _ io.Writer = (*PubSubLogWriter)(nil)
221-var _ slog.Handler = (*MultiHandler)(nil)
222-
223 func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
224 	return pubsub.RemoteSub("sub log-drain -k", ctx, connectionInfo)
225 }
remote_client.go link
+174 -1
  1diff --git a/remote_client.go b/remote_client.go
  2index 84c48f4..57345f1 100644
  3--- a/remote_client.go
  4+++ b/remote_client.go
  5@@ -2,16 +2,189 @@ package pubsub
  6 
  7 import (
  8 	"context"
  9+	"errors"
 10 	"fmt"
 11 	"io"
 12+	"log/slog"
 13 	"net"
 14 	"os"
 15 	"path/filepath"
 16+	"slices"
 17 	"strings"
 18+	"sync"
 19+	"time"
 20 
 21 	"golang.org/x/crypto/ssh"
 22 )
 23 
 24+type RemoteClientWriter struct {
 25+	SSHClient        *ssh.Client
 26+	Session          *ssh.Session
 27+	StdinPipe        io.WriteCloser
 28+	Done             chan struct{}
 29+	Messages         chan []byte
 30+	Timeout          time.Duration
 31+	BufferSize       int
 32+	closeOnce        sync.Once
 33+	closeMessageOnce sync.Once
 34+	startOnce        sync.Once
 35+	connecMu         sync.Mutex
 36+	Info             *RemoteClientInfo
 37+	Logger           *slog.Logger
 38+}
 39+
 40+var _ io.Writer = (*RemoteClientWriter)(nil)
 41+
 42+func NewRemoteClientWriter(info *RemoteClientInfo, logger *slog.Logger, buffer int) *RemoteClientWriter {
 43+	return &RemoteClientWriter{
 44+		Timeout:    10 * time.Millisecond,
 45+		Info:       info,
 46+		BufferSize: buffer,
 47+		Logger:     logger,
 48+	}
 49+}
 50+
 51+func (c *RemoteClientWriter) Close() error {
 52+	c.connecMu.Lock()
 53+	defer c.connecMu.Unlock()
 54+
 55+	if c.Done != nil {
 56+		c.closeOnce.Do(func() {
 57+			close(c.Done)
 58+		})
 59+	}
 60+
 61+	if c.Messages != nil {
 62+		c.closeMessageOnce.Do(func() {
 63+			close(c.Messages)
 64+		})
 65+	}
 66+
 67+	var errs []error
 68+
 69+	if c.StdinPipe != nil {
 70+		errs = append(errs, c.StdinPipe.Close())
 71+	}
 72+
 73+	if c.Session != nil {
 74+		errs = append(errs, c.Session.Close())
 75+	}
 76+
 77+	if c.SSHClient != nil {
 78+		errs = append(errs, c.SSHClient.Close())
 79+	}
 80+
 81+	return errors.Join(errs...)
 82+}
 83+
 84+func (c *RemoteClientWriter) Open(cmd string) error {
 85+	c.Close()
 86+
 87+	c.connecMu.Lock()
 88+
 89+	c.Done = make(chan struct{})
 90+	c.Messages = make(chan []byte, c.BufferSize)
 91+
 92+	sshClient, err := CreateRemoteClient(c.Info)
 93+	if err != nil {
 94+		c.connecMu.Unlock()
 95+		return err
 96+	}
 97+
 98+	session, err := sshClient.NewSession()
 99+	if err != nil {
100+		c.connecMu.Unlock()
101+		return err
102+	}
103+
104+	stdinPipe, err := session.StdinPipe()
105+	if err != nil {
106+		c.connecMu.Unlock()
107+		return err
108+	}
109+
110+	err = session.Start(cmd)
111+	if err != nil {
112+		c.connecMu.Unlock()
113+		return err
114+	}
115+
116+	c.SSHClient = sshClient
117+	c.Session = session
118+	c.StdinPipe = stdinPipe
119+
120+	c.closeOnce = sync.Once{}
121+	c.startOnce = sync.Once{}
122+
123+	c.connecMu.Unlock()
124+
125+	c.start()
126+
127+	return nil
128+}
129+
130+func (c *RemoteClientWriter) start() {
131+	c.startOnce.Do(func() {
132+		go func() {
133+			for {
134+				select {
135+				case data, ok := <-c.Messages:
136+					_, err := c.StdinPipe.Write(data)
137+					if !ok || err != nil {
138+						c.Logger.Error("received error on write, reopening conn", "error", err)
139+						return
140+					}
141+				case <-c.Done:
142+					return
143+				}
144+			}
145+		}()
146+	})
147+}
148+
149+func (c *RemoteClientWriter) Write(data []byte) (int, error) {
150+	var (
151+		n   int
152+		err error
153+	)
154+
155+	ok := c.connecMu.TryLock()
156+
157+	if !ok {
158+		return n, fmt.Errorf("unable to acquire lock to write")
159+	}
160+
161+	defer c.connecMu.Unlock()
162+
163+	if c.Messages == nil || c.Done == nil {
164+		return n, fmt.Errorf("conn not viable")
165+	}
166+
167+	select {
168+	case c.Messages <- slices.Clone(data):
169+		n = len(data)
170+	case <-time.After(c.Timeout):
171+		err = fmt.Errorf("unable to send data within timeout")
172+	case <-c.Done:
173+		break
174+	}
175+
176+	return n, err
177+}
178+
179+func (c *RemoteClientWriter) KeepAlive(cmd string) {
180+	for {
181+		err := c.Open(cmd)
182+		if err != nil {
183+			c.Logger.Error("unable to open send to ssh conn. retrying in 10 seconds", "error", err)
184+		} else {
185+			return
186+		}
187+
188+		<-time.After(10 * time.Second)
189+	}
190+}
191+
192 type RemoteClientInfo struct {
193 	RemoteHost     string
194 	KeyLocation    string
195@@ -22,7 +195,7 @@ type RemoteClientInfo struct {
196 
197 func CreateRemoteClient(info *RemoteClientInfo) (*ssh.Client, error) {
198 	if info == nil {
199-		return nil, fmt.Errorf("connection info is invalid")
200+		return nil, fmt.Errorf("conn info is invalid")
201 	}
202 
203 	if !strings.Contains(info.RemoteHost, ":") {