dashboard / pubsub / refactor: create remote client lib #28 rss

accepted · opened on 2024-11-12T14:50:03Z by erock
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 28
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 28
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 28 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 28
# accept PR
ssh pr.pico.sh pr accept 28
# close PR
ssh pr.pico.sh pr close 28

Logs

erock created pr with ps-59 on 2024-11-12T14:50:03Z
erock added ps-60 on 2024-11-12T15:09:30Z
erock added ps-61 on 2024-11-12T15:13:28Z
erock added ps-62 on 2024-11-12T15:14:29Z
erock changed status on 2024-11-12T16:25:08Z {"status":"accepted"}

Patchsets

ps-59 by erock on 2024-11-12T14:50:03Z
Range Diff ↕
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
-: ------- > 2: 67d38ff refactor(log): rm ConnectToLogs
ps-60 by erock on 2024-11-12T15:09:30Z
Range Diff ↕
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
2: 67d38ff = 2: 67d38ff refactor(log): rm ConnectToLogs
-: ------- > 3: 97084b4 refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
ps-61 by erock on 2024-11-12T15:13:28Z
Range Diff ↕
3: 97084b4 < -: ------- refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
2: 67d38ff ! 2: 866d44c refactor(log): `ConnectToLogs`

@@ log/log.go
 
 var _ io.Writer = (*PubSubLogWriter)(nil)
 var _ slog.Handler = (*MultiHandler)(nil)
-
-func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
 
 func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
-	sshClient, err := pubsub.CreateRemoteClient(connectionInfo)
-	if err != nil {
-		return nil, err
-	}
-
-	session, err := sshClient.NewSession()
-	if err != nil {
-		return nil, err
-	}
-
-	stdoutPipe, err := session.StdoutPipe()
-	if err != nil {
-		return nil, err
-	}
-
-	err = session.Start("sub log-drain -k")
-	if err != nil {
-		return nil, err
-	}
-
-	go func() {
-		<-ctx.Done()
-		session.Close()
-		sshClient.Close()
-	}()
-
-	return stdoutPipe, nil
-}
+	return pubsub.RemoteSub("sub log-drain -k", ctx, connectionInfo)
 }
ps-62 by erock on 2024-11-12T15:14:29Z

Patchset ps-62

refactor: create remote client lib

Eric Bower
2024-11-11T17:52:22Z
go.mod
+1 -0
go.sum
+2 -0
log/log.go
+6 -74

refactor(log): `ConnectToLogs`

Eric Bower
2024-11-12T15:08:41Z
log/log.go
+1 -27
Back to top

refactor: create remote client lib

go.mod link
+1 -0
 1diff --git a/go.mod b/go.mod
 2index 073f332..30b4335 100644
 3--- a/go.mod
 4+++ b/go.mod
 5@@ -4,6 +4,7 @@ go 1.23.1
 6 
 7 require (
 8 	github.com/antoniomika/syncmap v1.0.0
 9+	github.com/google/uuid v1.6.0
10 	golang.org/x/crypto v0.28.0
11 )
12 
go.sum link
+2 -0
 1diff --git a/go.sum b/go.sum
 2index 3c31bf5..b4364cb 100644
 3--- a/go.sum
 4+++ b/go.sum
 5@@ -1,5 +1,7 @@
 6 github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEyMKWU=
 7 github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
 8+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 9+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
10 golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
11 golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
12 golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
log/log.go link
+6 -74
  1diff --git a/log/log.go b/log/log.go
  2index 9dc431c..361f6bd 100644
  3--- a/log/log.go
  4+++ b/log/log.go
  5@@ -6,14 +6,11 @@ import (
  6 	"fmt"
  7 	"io"
  8 	"log/slog"
  9-	"net"
 10-	"os"
 11-	"path/filepath"
 12 	"slices"
 13-	"strings"
 14 	"sync"
 15 	"time"
 16 
 17+	"github.com/picosh/pubsub"
 18 	"golang.org/x/crypto/ssh"
 19 )
 20 
 21@@ -83,14 +80,6 @@ func (m *MultiHandler) WithGroup(name string) slog.Handler {
 22 	}
 23 }
 24 
 25-type PubSubConnectionInfo struct {
 26-	RemoteHost     string
 27-	KeyLocation    string
 28-	KeyPassphrase  string
 29-	RemoteHostname string
 30-	RemoteUser     string
 31-}
 32-
 33 type PubSubLogWriter struct {
 34 	SSHClient        *ssh.Client
 35 	Session          *ssh.Session
 36@@ -103,7 +92,7 @@ type PubSubLogWriter struct {
 37 	closeMessageOnce sync.Once
 38 	startOnce        sync.Once
 39 	connecMu         sync.Mutex
 40-	ConnectionInfo   *PubSubConnectionInfo
 41+	ConnectionInfo   *pubsub.RemoteClientInfo
 42 }
 43 
 44 func (c *PubSubLogWriter) Close() error {
 45@@ -147,7 +136,7 @@ func (c *PubSubLogWriter) Open() error {
 46 	c.Done = make(chan struct{})
 47 	c.Messages = make(chan []byte, c.BufferSize)
 48 
 49-	sshClient, err := CreateSSHClient(c.ConnectionInfo)
 50+	sshClient, err := pubsub.CreateRemoteClient(c.ConnectionInfo)
 51 	if err != nil {
 52 		c.connecMu.Unlock()
 53 		return err
 54@@ -251,64 +240,7 @@ func (c *PubSubLogWriter) Reconnect() {
 55 	}()
 56 }
 57 
 58-func CreateSSHClient(connectionInfo *PubSubConnectionInfo) (*ssh.Client, error) {
 59-	if connectionInfo == nil {
 60-		return nil, fmt.Errorf("connection info is invalid")
 61-	}
 62-
 63-	if !strings.Contains(connectionInfo.RemoteHost, ":") {
 64-		connectionInfo.RemoteHost += ":22"
 65-	}
 66-
 67-	rawConn, err := net.Dial("tcp", connectionInfo.RemoteHost)
 68-	if err != nil {
 69-		return nil, err
 70-	}
 71-
 72-	keyPath, err := filepath.Abs(connectionInfo.KeyLocation)
 73-	if err != nil {
 74-		return nil, err
 75-	}
 76-
 77-	f, err := os.Open(keyPath)
 78-	if err != nil {
 79-		return nil, err
 80-	}
 81-	defer f.Close()
 82-
 83-	data, err := io.ReadAll(f)
 84-	if err != nil {
 85-		return nil, err
 86-	}
 87-
 88-	var signer ssh.Signer
 89-
 90-	if connectionInfo.KeyPassphrase != "" {
 91-		signer, err = ssh.ParsePrivateKeyWithPassphrase(data, []byte(connectionInfo.KeyPassphrase))
 92-	} else {
 93-		signer, err = ssh.ParsePrivateKey(data)
 94-	}
 95-
 96-	if err != nil {
 97-		return nil, err
 98-	}
 99-
100-	sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, connectionInfo.RemoteHostname, &ssh.ClientConfig{
101-		Auth:            []ssh.AuthMethod{ssh.PublicKeys(signer)},
102-		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
103-		User:            connectionInfo.RemoteUser,
104-	})
105-
106-	if err != nil {
107-		return nil, err
108-	}
109-
110-	sshClient := ssh.NewClient(sshConn, chans, reqs)
111-
112-	return sshClient, nil
113-}
114-
115-func SendLogRegister(logger *slog.Logger, connectionInfo *PubSubConnectionInfo, buffer int) (*slog.Logger, error) {
116+func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInfo, buffer int) (*slog.Logger, error) {
117 	if buffer < 0 {
118 		buffer = 0
119 	}
120@@ -339,8 +271,8 @@ func SendLogRegister(logger *slog.Logger, connectionInfo *PubSubConnectionInfo,
121 var _ io.Writer = (*PubSubLogWriter)(nil)
122 var _ slog.Handler = (*MultiHandler)(nil)
123 
124-func ConnectToLogs(ctx context.Context, connectionInfo *PubSubConnectionInfo) (io.Reader, error) {
125-	sshClient, err := CreateSSHClient(connectionInfo)
126+func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
127+	sshClient, err := pubsub.CreateRemoteClient(connectionInfo)
128 	if err != nil {
129 		return nil, err
130 	}
remote_client.go link
+138 -0
  1diff --git a/remote_client.go b/remote_client.go
  2new file mode 100644
  3index 0000000..84c48f4
  4--- /dev/null
  5+++ b/remote_client.go
  6@@ -0,0 +1,138 @@
  7+package pubsub
  8+
  9+import (
 10+	"context"
 11+	"fmt"
 12+	"io"
 13+	"net"
 14+	"os"
 15+	"path/filepath"
 16+	"strings"
 17+
 18+	"golang.org/x/crypto/ssh"
 19+)
 20+
 21+type RemoteClientInfo struct {
 22+	RemoteHost     string
 23+	KeyLocation    string
 24+	KeyPassphrase  string
 25+	RemoteHostname string
 26+	RemoteUser     string
 27+}
 28+
 29+func CreateRemoteClient(info *RemoteClientInfo) (*ssh.Client, error) {
 30+	if info == nil {
 31+		return nil, fmt.Errorf("connection info is invalid")
 32+	}
 33+
 34+	if !strings.Contains(info.RemoteHost, ":") {
 35+		info.RemoteHost += ":22"
 36+	}
 37+
 38+	rawConn, err := net.Dial("tcp", info.RemoteHost)
 39+	if err != nil {
 40+		return nil, err
 41+	}
 42+
 43+	keyPath, err := filepath.Abs(info.KeyLocation)
 44+	if err != nil {
 45+		return nil, err
 46+	}
 47+
 48+	f, err := os.Open(keyPath)
 49+	if err != nil {
 50+		return nil, err
 51+	}
 52+	defer f.Close()
 53+
 54+	data, err := io.ReadAll(f)
 55+	if err != nil {
 56+		return nil, err
 57+	}
 58+
 59+	var signer ssh.Signer
 60+
 61+	if info.KeyPassphrase != "" {
 62+		signer, err = ssh.ParsePrivateKeyWithPassphrase(data, []byte(info.KeyPassphrase))
 63+	} else {
 64+		signer, err = ssh.ParsePrivateKey(data)
 65+	}
 66+
 67+	if err != nil {
 68+		return nil, err
 69+	}
 70+
 71+	sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, info.RemoteHostname, &ssh.ClientConfig{
 72+		Auth:            []ssh.AuthMethod{ssh.PublicKeys(signer)},
 73+		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
 74+		User:            info.RemoteUser,
 75+	})
 76+
 77+	if err != nil {
 78+		return nil, err
 79+	}
 80+
 81+	sshClient := ssh.NewClient(sshConn, chans, reqs)
 82+
 83+	return sshClient, nil
 84+}
 85+
 86+func RemoteSub(cmd string, ctx context.Context, info *RemoteClientInfo) (io.Reader, error) {
 87+	sshClient, err := CreateRemoteClient(info)
 88+	if err != nil {
 89+		return nil, err
 90+	}
 91+
 92+	session, err := sshClient.NewSession()
 93+	if err != nil {
 94+		return nil, err
 95+	}
 96+
 97+	stdoutPipe, err := session.StdoutPipe()
 98+	if err != nil {
 99+		return nil, err
100+	}
101+
102+	err = session.Start(cmd)
103+	if err != nil {
104+		return nil, err
105+	}
106+
107+	go func() {
108+		<-ctx.Done()
109+		session.Close()
110+		sshClient.Close()
111+	}()
112+
113+	return stdoutPipe, nil
114+}
115+
116+func RemotePub(cmd string, ctx context.Context, info *RemoteClientInfo) (io.WriteCloser, error) {
117+	sshClient, err := CreateRemoteClient(info)
118+	if err != nil {
119+		return nil, err
120+	}
121+
122+	session, err := sshClient.NewSession()
123+	if err != nil {
124+		return nil, err
125+	}
126+
127+	stdinPipe, err := session.StdinPipe()
128+	if err != nil {
129+		return nil, err
130+	}
131+
132+	err = session.Start(cmd)
133+	if err != nil {
134+		return nil, err
135+	}
136+
137+	go func() {
138+		<-ctx.Done()
139+		session.Close()
140+		sshClient.Close()
141+	}()
142+
143+	return stdinPipe, err
144+}

refactor(log): `ConnectToLogs`

Now it is a proxy for `RemoteSub("sub log-drain -k")`
log/log.go link
+1 -27
 1diff --git a/log/log.go b/log/log.go
 2index 361f6bd..98bfddc 100644
 3--- a/log/log.go
 4+++ b/log/log.go
 5@@ -272,31 +272,5 @@ var _ io.Writer = (*PubSubLogWriter)(nil)
 6 var _ slog.Handler = (*MultiHandler)(nil)
 7 
 8 func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
 9-	sshClient, err := pubsub.CreateRemoteClient(connectionInfo)
10-	if err != nil {
11-		return nil, err
12-	}
13-
14-	session, err := sshClient.NewSession()
15-	if err != nil {
16-		return nil, err
17-	}
18-
19-	stdoutPipe, err := session.StdoutPipe()
20-	if err != nil {
21-		return nil, err
22-	}
23-
24-	err = session.Start("sub log-drain -k")
25-	if err != nil {
26-		return nil, err
27-	}
28-
29-	go func() {
30-		<-ctx.Done()
31-		session.Close()
32-		sshClient.Close()
33-	}()
34-
35-	return stdoutPipe, nil
36+	return pubsub.RemoteSub("sub log-drain -k", ctx, connectionInfo)
37 }