dashboard / erock/pubsub / refactor: create remote client lib #28 rss

accepted · opened on 2024-11-12T14:50:03Z by erock
Help
checkout latest patchset:
ssh pr.pico.sh print pr-28 | git am -3
checkout any patchset in a patch request:
ssh pr.pico.sh print ps-X | git am -3
add changes to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add 28
add review to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add --review 28
accept PR:
ssh pr.pico.sh pr accept 28
close PR:
ssh pr.pico.sh pr close 28

Logs

erock created pr with ps-59 on 2024-11-12T14:50:03Z
erock added ps-60 on 2024-11-12T15:09:30Z
erock added ps-61 on 2024-11-12T15:13:28Z
erock added ps-62 on 2024-11-12T15:14:29Z
erock changed status on 2024-11-12T16:25:08Z {"status":"accepted"}

Patchsets

ps-59 by erock on 2024-11-12T14:50:03Z
Range Diff ↕ rd-60
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
-: ------- > 2: 67d38ff refactor(log): rm ConnectToLogs
ps-60 by erock on 2024-11-12T15:09:30Z
Range Diff ↕ rd-61
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
2: 67d38ff = 2: 67d38ff refactor(log): rm ConnectToLogs
-: ------- > 3: 97084b4 refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
ps-61 by erock on 2024-11-12T15:13:28Z
Range Diff ↕ rd-62
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
2: 67d38ff ! 2: 866d44c refactor(log): `ConnectToLogs`
3: 97084b4 < -: ------- refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
ps-62 by erock on 2024-11-12T15:14:29Z

Patchset ps-62

refactor: create remote client lib

Eric Bower
2024-11-11T17:52:22Z
go.mod
+1 -0
go.sum
+2 -0
log/log.go
+6 -74

refactor(log): `ConnectToLogs`

Eric Bower
2024-11-12T15:08:41Z
log/log.go
+1 -27
Back to top

refactor: create remote client lib

go.mod link
+1 -0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
diff --git a/go.mod b/go.mod
index 073f332..30b4335 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.23.1
 
 require (
 	github.com/antoniomika/syncmap v1.0.0
+	github.com/google/uuid v1.6.0
 	golang.org/x/crypto v0.28.0
 )
 
go.sum link
+2 -0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
diff --git a/go.sum b/go.sum
index 3c31bf5..b4364cb 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,7 @@
 github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEyMKWU=
 github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
 golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
 golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
log/log.go link
+6 -74
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
diff --git a/log/log.go b/log/log.go
index 9dc431c..361f6bd 100644
--- a/log/log.go
+++ b/log/log.go
@@ -6,14 +6,11 @@ import (
 	"fmt"
 	"io"
 	"log/slog"
-	"net"
-	"os"
-	"path/filepath"
 	"slices"
-	"strings"
 	"sync"
 	"time"
 
+	"github.com/picosh/pubsub"
 	"golang.org/x/crypto/ssh"
 )
 
@@ -83,14 +80,6 @@ func (m *MultiHandler) WithGroup(name string) slog.Handler {
 	}
 }
 
-type PubSubConnectionInfo struct {
-	RemoteHost     string
-	KeyLocation    string
-	KeyPassphrase  string
-	RemoteHostname string
-	RemoteUser     string
-}
-
 type PubSubLogWriter struct {
 	SSHClient        *ssh.Client
 	Session          *ssh.Session
@@ -103,7 +92,7 @@ type PubSubLogWriter struct {
 	closeMessageOnce sync.Once
 	startOnce        sync.Once
 	connecMu         sync.Mutex
-	ConnectionInfo   *PubSubConnectionInfo
+	ConnectionInfo   *pubsub.RemoteClientInfo
 }
 
 func (c *PubSubLogWriter) Close() error {
@@ -147,7 +136,7 @@ func (c *PubSubLogWriter) Open() error {
 	c.Done = make(chan struct{})
 	c.Messages = make(chan []byte, c.BufferSize)
 
-	sshClient, err := CreateSSHClient(c.ConnectionInfo)
+	sshClient, err := pubsub.CreateRemoteClient(c.ConnectionInfo)
 	if err != nil {
 		c.connecMu.Unlock()
 		return err
@@ -251,64 +240,7 @@ func (c *PubSubLogWriter) Reconnect() {
 	}()
 }
 
-func CreateSSHClient(connectionInfo *PubSubConnectionInfo) (*ssh.Client, error) {
-	if connectionInfo == nil {
-		return nil, fmt.Errorf("connection info is invalid")
-	}
-
-	if !strings.Contains(connectionInfo.RemoteHost, ":") {
-		connectionInfo.RemoteHost += ":22"
-	}
-
-	rawConn, err := net.Dial("tcp", connectionInfo.RemoteHost)
-	if err != nil {
-		return nil, err
-	}
-
-	keyPath, err := filepath.Abs(connectionInfo.KeyLocation)
-	if err != nil {
-		return nil, err
-	}
-
-	f, err := os.Open(keyPath)
-	if err != nil {
-		return nil, err
-	}
-	defer f.Close()
-
-	data, err := io.ReadAll(f)
-	if err != nil {
-		return nil, err
-	}
-
-	var signer ssh.Signer
-
-	if connectionInfo.KeyPassphrase != "" {
-		signer, err = ssh.ParsePrivateKeyWithPassphrase(data, []byte(connectionInfo.KeyPassphrase))
-	} else {
-		signer, err = ssh.ParsePrivateKey(data)
-	}
-
-	if err != nil {
-		return nil, err
-	}
-
-	sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, connectionInfo.RemoteHostname, &ssh.ClientConfig{
-		Auth:            []ssh.AuthMethod{ssh.PublicKeys(signer)},
-		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
-		User:            connectionInfo.RemoteUser,
-	})
-
-	if err != nil {
-		return nil, err
-	}
-
-	sshClient := ssh.NewClient(sshConn, chans, reqs)
-
-	return sshClient, nil
-}
-
-func SendLogRegister(logger *slog.Logger, connectionInfo *PubSubConnectionInfo, buffer int) (*slog.Logger, error) {
+func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInfo, buffer int) (*slog.Logger, error) {
 	if buffer < 0 {
 		buffer = 0
 	}
@@ -339,8 +271,8 @@ func SendLogRegister(logger *slog.Logger, connectionInfo *PubSubConnectionInfo,
 var _ io.Writer = (*PubSubLogWriter)(nil)
 var _ slog.Handler = (*MultiHandler)(nil)
 
-func ConnectToLogs(ctx context.Context, connectionInfo *PubSubConnectionInfo) (io.Reader, error) {
-	sshClient, err := CreateSSHClient(connectionInfo)
+func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
+	sshClient, err := pubsub.CreateRemoteClient(connectionInfo)
 	if err != nil {
 		return nil, err
 	}
remote_client.go link
+138 -0
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
diff --git a/remote_client.go b/remote_client.go
new file mode 100644
index 0000000..84c48f4
--- /dev/null
+++ b/remote_client.go
@@ -0,0 +1,138 @@
+package pubsub
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"net"
+	"os"
+	"path/filepath"
+	"strings"
+
+	"golang.org/x/crypto/ssh"
+)
+
+type RemoteClientInfo struct {
+	RemoteHost     string
+	KeyLocation    string
+	KeyPassphrase  string
+	RemoteHostname string
+	RemoteUser     string
+}
+
+func CreateRemoteClient(info *RemoteClientInfo) (*ssh.Client, error) {
+	if info == nil {
+		return nil, fmt.Errorf("connection info is invalid")
+	}
+
+	if !strings.Contains(info.RemoteHost, ":") {
+		info.RemoteHost += ":22"
+	}
+
+	rawConn, err := net.Dial("tcp", info.RemoteHost)
+	if err != nil {
+		return nil, err
+	}
+
+	keyPath, err := filepath.Abs(info.KeyLocation)
+	if err != nil {
+		return nil, err
+	}
+
+	f, err := os.Open(keyPath)
+	if err != nil {
+		return nil, err
+	}
+	defer f.Close()
+
+	data, err := io.ReadAll(f)
+	if err != nil {
+		return nil, err
+	}
+
+	var signer ssh.Signer
+
+	if info.KeyPassphrase != "" {
+		signer, err = ssh.ParsePrivateKeyWithPassphrase(data, []byte(info.KeyPassphrase))
+	} else {
+		signer, err = ssh.ParsePrivateKey(data)
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, info.RemoteHostname, &ssh.ClientConfig{
+		Auth:            []ssh.AuthMethod{ssh.PublicKeys(signer)},
+		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+		User:            info.RemoteUser,
+	})
+
+	if err != nil {
+		return nil, err
+	}
+
+	sshClient := ssh.NewClient(sshConn, chans, reqs)
+
+	return sshClient, nil
+}
+
+func RemoteSub(cmd string, ctx context.Context, info *RemoteClientInfo) (io.Reader, error) {
+	sshClient, err := CreateRemoteClient(info)
+	if err != nil {
+		return nil, err
+	}
+
+	session, err := sshClient.NewSession()
+	if err != nil {
+		return nil, err
+	}
+
+	stdoutPipe, err := session.StdoutPipe()
+	if err != nil {
+		return nil, err
+	}
+
+	err = session.Start(cmd)
+	if err != nil {
+		return nil, err
+	}
+
+	go func() {
+		<-ctx.Done()
+		session.Close()
+		sshClient.Close()
+	}()
+
+	return stdoutPipe, nil
+}
+
+func RemotePub(cmd string, ctx context.Context, info *RemoteClientInfo) (io.WriteCloser, error) {
+	sshClient, err := CreateRemoteClient(info)
+	if err != nil {
+		return nil, err
+	}
+
+	session, err := sshClient.NewSession()
+	if err != nil {
+		return nil, err
+	}
+
+	stdinPipe, err := session.StdinPipe()
+	if err != nil {
+		return nil, err
+	}
+
+	err = session.Start(cmd)
+	if err != nil {
+		return nil, err
+	}
+
+	go func() {
+		<-ctx.Done()
+		session.Close()
+		sshClient.Close()
+	}()
+
+	return stdinPipe, err
+}

refactor(log): `ConnectToLogs`

Now it is a proxy for `RemoteSub("sub log-drain -k")`
log/log.go link
+1 -27
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
diff --git a/log/log.go b/log/log.go
index 361f6bd..98bfddc 100644
--- a/log/log.go
+++ b/log/log.go
@@ -272,31 +272,5 @@ var _ io.Writer = (*PubSubLogWriter)(nil)
 var _ slog.Handler = (*MultiHandler)(nil)
 
 func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
-	sshClient, err := pubsub.CreateRemoteClient(connectionInfo)
-	if err != nil {
-		return nil, err
-	}
-
-	session, err := sshClient.NewSession()
-	if err != nil {
-		return nil, err
-	}
-
-	stdoutPipe, err := session.StdoutPipe()
-	if err != nil {
-		return nil, err
-	}
-
-	err = session.Start("sub log-drain -k")
-	if err != nil {
-		return nil, err
-	}
-
-	go func() {
-		<-ctx.Done()
-		session.Close()
-		sshClient.Close()
-	}()
-
-	return stdoutPipe, nil
+	return pubsub.RemoteSub("sub log-drain -k", ctx, connectionInfo)
 }