Logs
Patchsets
Range Diff ↕
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
-: ------- > 2: 67d38ff refactor(log): rm ConnectToLogs
Range Diff ↕
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
2: 67d38ff = 2: 67d38ff refactor(log): rm ConnectToLogs
-: ------- > 3: 97084b4 refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
Range Diff ↕
3: 97084b4 < -: ------- refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
1: 57a5727 = 1: 57a5727 refactor: create remote client lib
2: 67d38ff ! 2: 866d44c refactor(log): `ConnectToLogs`
@@ log/log.go var _ io.Writer = (*PubSubLogWriter)(nil) var _ slog.Handler = (*MultiHandler)(nil) - -func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) { func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) { - sshClient, err := pubsub.CreateRemoteClient(connectionInfo) - if err != nil { - return nil, err - } - - session, err := sshClient.NewSession() - if err != nil { - return nil, err - } - - stdoutPipe, err := session.StdoutPipe() - if err != nil { - return nil, err - } - - err = session.Start("sub log-drain -k") - if err != nil { - return nil, err - } - - go func() { - <-ctx.Done() - session.Close() - sshClient.Close() - }() - - return stdoutPipe, nil -} + return pubsub.RemoteSub("sub log-drain -k", ctx, connectionInfo) }
Patchset ps-61
refactor: create remote client lib
Eric Bower
go.mod
+1
-0
go.sum
+2
-0
log/log.go
+6
-74
remote_client.go
+138
-0
refactor(log): rm ConnectToLogs
Eric Bower
log/log.go
+0
-30
refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
Eric Bower
log/log.go
+4
-0
refactor: create remote client lib
go.mod
link
+1
-0
+1
-0
1diff --git a/go.mod b/go.mod
2index 073f332..30b4335 100644
3--- a/go.mod
4+++ b/go.mod
5@@ -4,6 +4,7 @@ go 1.23.1
6
7 require (
8 github.com/antoniomika/syncmap v1.0.0
9+ github.com/google/uuid v1.6.0
10 golang.org/x/crypto v0.28.0
11 )
12
go.sum
link
+2
-0
+2
-0
1diff --git a/go.sum b/go.sum
2index 3c31bf5..b4364cb 100644
3--- a/go.sum
4+++ b/go.sum
5@@ -1,5 +1,7 @@
6 github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEyMKWU=
7 github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
8+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
9+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
10 golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
11 golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
12 golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
log/log.go
link
+6
-74
+6
-74
1diff --git a/log/log.go b/log/log.go
2index 9dc431c..361f6bd 100644
3--- a/log/log.go
4+++ b/log/log.go
5@@ -6,14 +6,11 @@ import (
6 "fmt"
7 "io"
8 "log/slog"
9- "net"
10- "os"
11- "path/filepath"
12 "slices"
13- "strings"
14 "sync"
15 "time"
16
17+ "github.com/picosh/pubsub"
18 "golang.org/x/crypto/ssh"
19 )
20
21@@ -83,14 +80,6 @@ func (m *MultiHandler) WithGroup(name string) slog.Handler {
22 }
23 }
24
25-type PubSubConnectionInfo struct {
26- RemoteHost string
27- KeyLocation string
28- KeyPassphrase string
29- RemoteHostname string
30- RemoteUser string
31-}
32-
33 type PubSubLogWriter struct {
34 SSHClient *ssh.Client
35 Session *ssh.Session
36@@ -103,7 +92,7 @@ type PubSubLogWriter struct {
37 closeMessageOnce sync.Once
38 startOnce sync.Once
39 connecMu sync.Mutex
40- ConnectionInfo *PubSubConnectionInfo
41+ ConnectionInfo *pubsub.RemoteClientInfo
42 }
43
44 func (c *PubSubLogWriter) Close() error {
45@@ -147,7 +136,7 @@ func (c *PubSubLogWriter) Open() error {
46 c.Done = make(chan struct{})
47 c.Messages = make(chan []byte, c.BufferSize)
48
49- sshClient, err := CreateSSHClient(c.ConnectionInfo)
50+ sshClient, err := pubsub.CreateRemoteClient(c.ConnectionInfo)
51 if err != nil {
52 c.connecMu.Unlock()
53 return err
54@@ -251,64 +240,7 @@ func (c *PubSubLogWriter) Reconnect() {
55 }()
56 }
57
58-func CreateSSHClient(connectionInfo *PubSubConnectionInfo) (*ssh.Client, error) {
59- if connectionInfo == nil {
60- return nil, fmt.Errorf("connection info is invalid")
61- }
62-
63- if !strings.Contains(connectionInfo.RemoteHost, ":") {
64- connectionInfo.RemoteHost += ":22"
65- }
66-
67- rawConn, err := net.Dial("tcp", connectionInfo.RemoteHost)
68- if err != nil {
69- return nil, err
70- }
71-
72- keyPath, err := filepath.Abs(connectionInfo.KeyLocation)
73- if err != nil {
74- return nil, err
75- }
76-
77- f, err := os.Open(keyPath)
78- if err != nil {
79- return nil, err
80- }
81- defer f.Close()
82-
83- data, err := io.ReadAll(f)
84- if err != nil {
85- return nil, err
86- }
87-
88- var signer ssh.Signer
89-
90- if connectionInfo.KeyPassphrase != "" {
91- signer, err = ssh.ParsePrivateKeyWithPassphrase(data, []byte(connectionInfo.KeyPassphrase))
92- } else {
93- signer, err = ssh.ParsePrivateKey(data)
94- }
95-
96- if err != nil {
97- return nil, err
98- }
99-
100- sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, connectionInfo.RemoteHostname, &ssh.ClientConfig{
101- Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
102- HostKeyCallback: ssh.InsecureIgnoreHostKey(),
103- User: connectionInfo.RemoteUser,
104- })
105-
106- if err != nil {
107- return nil, err
108- }
109-
110- sshClient := ssh.NewClient(sshConn, chans, reqs)
111-
112- return sshClient, nil
113-}
114-
115-func SendLogRegister(logger *slog.Logger, connectionInfo *PubSubConnectionInfo, buffer int) (*slog.Logger, error) {
116+func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInfo, buffer int) (*slog.Logger, error) {
117 if buffer < 0 {
118 buffer = 0
119 }
120@@ -339,8 +271,8 @@ func SendLogRegister(logger *slog.Logger, connectionInfo *PubSubConnectionInfo,
121 var _ io.Writer = (*PubSubLogWriter)(nil)
122 var _ slog.Handler = (*MultiHandler)(nil)
123
124-func ConnectToLogs(ctx context.Context, connectionInfo *PubSubConnectionInfo) (io.Reader, error) {
125- sshClient, err := CreateSSHClient(connectionInfo)
126+func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
127+ sshClient, err := pubsub.CreateRemoteClient(connectionInfo)
128 if err != nil {
129 return nil, err
130 }
remote_client.go
link
+138
-0
+138
-0
1diff --git a/remote_client.go b/remote_client.go
2new file mode 100644
3index 0000000..84c48f4
4--- /dev/null
5+++ b/remote_client.go
6@@ -0,0 +1,138 @@
7+package pubsub
8+
9+import (
10+ "context"
11+ "fmt"
12+ "io"
13+ "net"
14+ "os"
15+ "path/filepath"
16+ "strings"
17+
18+ "golang.org/x/crypto/ssh"
19+)
20+
21+type RemoteClientInfo struct {
22+ RemoteHost string
23+ KeyLocation string
24+ KeyPassphrase string
25+ RemoteHostname string
26+ RemoteUser string
27+}
28+
29+func CreateRemoteClient(info *RemoteClientInfo) (*ssh.Client, error) {
30+ if info == nil {
31+ return nil, fmt.Errorf("connection info is invalid")
32+ }
33+
34+ if !strings.Contains(info.RemoteHost, ":") {
35+ info.RemoteHost += ":22"
36+ }
37+
38+ rawConn, err := net.Dial("tcp", info.RemoteHost)
39+ if err != nil {
40+ return nil, err
41+ }
42+
43+ keyPath, err := filepath.Abs(info.KeyLocation)
44+ if err != nil {
45+ return nil, err
46+ }
47+
48+ f, err := os.Open(keyPath)
49+ if err != nil {
50+ return nil, err
51+ }
52+ defer f.Close()
53+
54+ data, err := io.ReadAll(f)
55+ if err != nil {
56+ return nil, err
57+ }
58+
59+ var signer ssh.Signer
60+
61+ if info.KeyPassphrase != "" {
62+ signer, err = ssh.ParsePrivateKeyWithPassphrase(data, []byte(info.KeyPassphrase))
63+ } else {
64+ signer, err = ssh.ParsePrivateKey(data)
65+ }
66+
67+ if err != nil {
68+ return nil, err
69+ }
70+
71+ sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, info.RemoteHostname, &ssh.ClientConfig{
72+ Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
73+ HostKeyCallback: ssh.InsecureIgnoreHostKey(),
74+ User: info.RemoteUser,
75+ })
76+
77+ if err != nil {
78+ return nil, err
79+ }
80+
81+ sshClient := ssh.NewClient(sshConn, chans, reqs)
82+
83+ return sshClient, nil
84+}
85+
86+func RemoteSub(cmd string, ctx context.Context, info *RemoteClientInfo) (io.Reader, error) {
87+ sshClient, err := CreateRemoteClient(info)
88+ if err != nil {
89+ return nil, err
90+ }
91+
92+ session, err := sshClient.NewSession()
93+ if err != nil {
94+ return nil, err
95+ }
96+
97+ stdoutPipe, err := session.StdoutPipe()
98+ if err != nil {
99+ return nil, err
100+ }
101+
102+ err = session.Start(cmd)
103+ if err != nil {
104+ return nil, err
105+ }
106+
107+ go func() {
108+ <-ctx.Done()
109+ session.Close()
110+ sshClient.Close()
111+ }()
112+
113+ return stdoutPipe, nil
114+}
115+
116+func RemotePub(cmd string, ctx context.Context, info *RemoteClientInfo) (io.WriteCloser, error) {
117+ sshClient, err := CreateRemoteClient(info)
118+ if err != nil {
119+ return nil, err
120+ }
121+
122+ session, err := sshClient.NewSession()
123+ if err != nil {
124+ return nil, err
125+ }
126+
127+ stdinPipe, err := session.StdinPipe()
128+ if err != nil {
129+ return nil, err
130+ }
131+
132+ err = session.Start(cmd)
133+ if err != nil {
134+ return nil, err
135+ }
136+
137+ go func() {
138+ <-ctx.Done()
139+ session.Close()
140+ sshClient.Close()
141+ }()
142+
143+ return stdinPipe, err
144+}
refactor(log): rm ConnectToLogs
We can just use `RemoteSub("sub log-drain -k")`
log/log.go
link
+0
-30
+0
-30
1diff --git a/log/log.go b/log/log.go
2index 361f6bd..d959656 100644
3--- a/log/log.go
4+++ b/log/log.go
5@@ -270,33 +270,3 @@ func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInf
6
7 var _ io.Writer = (*PubSubLogWriter)(nil)
8 var _ slog.Handler = (*MultiHandler)(nil)
9-
10-func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
11- sshClient, err := pubsub.CreateRemoteClient(connectionInfo)
12- if err != nil {
13- return nil, err
14- }
15-
16- session, err := sshClient.NewSession()
17- if err != nil {
18- return nil, err
19- }
20-
21- stdoutPipe, err := session.StdoutPipe()
22- if err != nil {
23- return nil, err
24- }
25-
26- err = session.Start("sub log-drain -k")
27- if err != nil {
28- return nil, err
29- }
30-
31- go func() {
32- <-ctx.Done()
33- session.Close()
34- sshClient.Close()
35- }()
36-
37- return stdoutPipe, nil
38-}
refactor(log): preserve `ConnectToLogs` but make it a convenient proxy
log/log.go
link
+4
-0
+4
-0
1diff --git a/log/log.go b/log/log.go
2index d959656..98bfddc 100644
3--- a/log/log.go
4+++ b/log/log.go
5@@ -270,3 +270,7 @@ func SendLogRegister(logger *slog.Logger, connectionInfo *pubsub.RemoteClientInf
6
7 var _ io.Writer = (*PubSubLogWriter)(nil)
8 var _ slog.Handler = (*MultiHandler)(nil)
9+
10+func ConnectToLogs(ctx context.Context, connectionInfo *pubsub.RemoteClientInfo) (io.Reader, error) {
11+ return pubsub.RemoteSub("sub log-drain -k", ctx, connectionInfo)
12+}