dashboard / erock/pico / feat(auth): subscribe to pico's metric-drain pipe #32 rss

accepted · opened on 2024-11-12T21:13:35Z by erock
Help
checkout latest patchset:
ssh pr.pico.sh print pr-32 | git am -3
checkout any patchset in a patch request:
ssh pr.pico.sh print ps-X | git am -3
add changes to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add 32
add review to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add --review 32
accept PR:
ssh pr.pico.sh pr accept 32
close PR:
ssh pr.pico.sh pr close 32

Logs

erock created pr with ps-66 on 2024-11-12T21:13:35Z
erock added ps-73 on 2024-11-14T15:42:32Z
erock changed status on 2024-11-23T03:34:16Z {"status":"accepted"}

Patchsets

ps-66 by erock on 2024-11-12T21:13:35Z
Range Diff ↕ rd-73
1: 7ec3569 = 1: 7ec3569 feat(auth): subscribe to pico's metric-drain pipe
2: 8a197f0 = 2: 8a197f0 chore: update pubsub
3: d4bda15 = 3: d4bda15 refactor: use pipe for analytics
-: ------- > 4: 2b7c358 chore: prep for release
ps-73 by erock on 2024-11-14T15:42:32Z

Patchset ps-66

chore: update pubsub

Eric Bower
2024-11-12T16:29:38Z
go.mod
+2 -2
go.sum
+2 -2

refactor: use pipe for analytics

Eric Bower
2024-11-12T21:12:05Z
Back to top

feat(auth): subscribe to pico's metric-drain pipe

We have a lot of services that need to record site usage analytics so we
need a distributed way to receive these events.

This overloads the auth service since it's now serving as a destination
for our metric-drain.
auth/api.go link
+33 -1
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
diff --git a/auth/api.go b/auth/api.go
index 8e279c4..8fdcaa2 100644
--- a/auth/api.go
+++ b/auth/api.go
@@ -1,6 +1,7 @@
 package auth
 
 import (
+	"bufio"
 	"context"
 	"crypto/hmac"
 	"encoding/json"
@@ -18,6 +19,7 @@ import (
 	"github.com/picosh/pico/db"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
+	"github.com/picosh/pubsub"
 	"github.com/picosh/utils"
 )
 
@@ -639,6 +641,31 @@ func handler(routes []shared.Route, client *Client) http.HandlerFunc {
 	}
 }
 
+func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	conn := shared.NewPicoPipeClient()
+	stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn)
+
+	if err != nil {
+		logger.Error("could not sub to metric-drain", "err", err)
+		return
+	}
+
+	scanner := bufio.NewScanner(stdoutPipe)
+	for scanner.Scan() {
+		line := scanner.Text()
+		view := db.AnalyticsVisits{}
+		err := json.Unmarshal([]byte(line), &view)
+		if err != nil {
+			logger.Error("json unmarshal", "err", err)
+			continue
+		}
+		err = dbpool.InsertVisit(&view)
+		if err != nil {
+			logger.Error("could not insert view record", "err", err)
+		}
+	}
+}
+
 type AuthCfg struct {
 	Debug  bool
 	Port   string
@@ -667,6 +694,11 @@ func StartApiServer() {
 		Logger: logger,
 	}
 
+	ctx := context.Background()
+	// gather metrics in the auth service
+	go metricDrainSub(ctx, db, logger)
+	defer ctx.Done()
+
 	routes := createMainRoutes()
 
 	if cfg.Debug {
@@ -679,6 +711,6 @@ func StartApiServer() {
 	client.Logger.Info("starting server on port", "port", cfg.Port)
 	err := http.ListenAndServe(portStr, router)
 	if err != nil {
-		client.Logger.Info(err.Error())
+		client.Logger.Info("http-serve", "err", err.Error())
 	}
 }
db/db.go link
+10 -10
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
diff --git a/db/db.go b/db/db.go
index 4806b8d..fbcc715 100644
--- a/db/db.go
+++ b/db/db.go
@@ -161,16 +161,16 @@ type PostAnalytics struct {
 }
 
 type AnalyticsVisits struct {
-	ID        string
-	UserID    string
-	ProjectID string
-	PostID    string
-	Host      string
-	Path      string
-	IpAddress string
-	UserAgent string
-	Referer   string
-	Status    int
+	ID        string `json:"id"`
+	UserID    string `json:"user_id"`
+	ProjectID string `json:"project_id"`
+	PostID    string `json:"post_id"`
+	Host      string `json:"host"`
+	Path      string `json:"path"`
+	IpAddress string `json:"ip_adress"`
+	UserAgent string `json:"user_agent"`
+	Referer   string `json:"referer"`
+	Status    int    `json:"status"`
 }
 
 type VisitInterval struct {
pico/cli.go link
+2 -7
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
diff --git a/pico/cli.go b/pico/cli.go
index e64f0e4..ab65548 100644
--- a/pico/cli.go
+++ b/pico/cli.go
@@ -70,13 +70,8 @@ func (c *Cmd) notifications() error {
 }
 
 func (c *Cmd) logs(ctx context.Context) error {
-	stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, &pipeLogger.PubSubConnectionInfo{
-		RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
-		KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
-		KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
-		RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
-		RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
-	})
+	conn := shared.NewPicoPipeClient()
+	stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, conn)
 
 	if err != nil {
 		return err
shared/config.go link
+2 -7
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
diff --git a/shared/config.go b/shared/config.go
index 0413e1e..f1ad895 100644
--- a/shared/config.go
+++ b/shared/config.go
@@ -279,13 +279,8 @@ func CreateLogger(space string) *slog.Logger {
 	newLogger := log
 
 	if strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true" {
-		newLog, err := pipeLogger.SendLogRegister(log, &pipeLogger.PubSubConnectionInfo{
-			RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
-			KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
-			KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
-			RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
-			RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
-		}, 100)
+		conn := NewPicoPipeClient()
+		newLog, err := pipeLogger.SendLogRegister(log, conn, 100)
 
 		if err == nil {
 			newLogger = newLog
shared/pubsub.go link
+16 -0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
diff --git a/shared/pubsub.go b/shared/pubsub.go
new file mode 100644
index 0000000..e0d9e73
--- /dev/null
+++ b/shared/pubsub.go
@@ -0,0 +1,16 @@
+package shared
+
+import (
+	"github.com/picosh/pubsub"
+	"github.com/picosh/utils"
+)
+
+func NewPicoPipeClient() *pubsub.RemoteClientInfo {
+	return &pubsub.RemoteClientInfo{
+		RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
+		KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
+		KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
+		RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
+		RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
+	}
+}
tui/logs/logs.go link
+3 -8
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
diff --git a/tui/logs/logs.go b/tui/logs/logs.go
index d2f2891..6e01c7c 100644
--- a/tui/logs/logs.go
+++ b/tui/logs/logs.go
@@ -11,6 +11,7 @@ import (
 	"github.com/charmbracelet/bubbles/viewport"
 	tea "github.com/charmbracelet/bubbletea"
 	"github.com/charmbracelet/lipgloss"
+	"github.com/picosh/pico/shared"
 	"github.com/picosh/pico/tui/common"
 	"github.com/picosh/pico/tui/pages"
 	"github.com/picosh/utils"
@@ -170,14 +171,8 @@ func (m Model) waitForActivity(sub chan map[string]any) tea.Cmd {
 
 func (m Model) connectLogs(sub chan map[string]any) tea.Cmd {
 	return func() tea.Msg {
-		stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, &pipeLogger.PubSubConnectionInfo{
-			RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
-			KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
-			KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
-			RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
-			RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
-		})
-
+		conn := shared.NewPicoPipeClient()
+		stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, conn)
 		if err != nil {
 			return errMsg(err)
 		}

chore: update pubsub

go.mod link
+2 -2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
diff --git a/go.mod b/go.mod
index 90c2f02..b588d80 100644
--- a/go.mod
+++ b/go.mod
@@ -31,13 +31,12 @@ require (
 	github.com/gorilla/feeds v1.2.0
 	github.com/lib/pq v1.10.9
 	github.com/microcosm-cc/bluemonday v1.0.27
-	github.com/minio/minio-go/v7 v7.0.80
 	github.com/mmcdole/gofeed v1.3.0
 	github.com/muesli/reflow v0.3.0
 	github.com/muesli/termenv v0.15.3-0.20240912151726-82936c5ea257
 	github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
 	github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23
-	github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8
+	github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659
 	github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f
 	github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9
 	github.com/picosh/utils v0.0.0-20241018143404-b351d5d765f3
@@ -133,6 +132,7 @@ require (
 	github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
 	github.com/minio/madmin-go/v3 v3.0.75 // indirect
 	github.com/minio/md5-simd v1.1.2 // indirect
+	github.com/minio/minio-go/v7 v7.0.80 // indirect
 	github.com/mmcdole/goxpp v1.1.1 // indirect
 	github.com/mmcloughlin/md4 v0.1.2 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
go.sum link
+2 -2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
diff --git a/go.sum b/go.sum
index 4fc24c3..c70d06b 100644
--- a/go.sum
+++ b/go.sum
@@ -269,8 +269,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23 h1:NEJ5a4UXeF0/X7xmYNzXcwLQID9DwgazlqkMMC5zZ3M=
 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23/go.mod h1:cF+eAl4G1vU+WOD8cYCKaxokHo6MWmbR8J4/SJnvESg=
-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8 h1:E/eQsxdHBctPArAzjSHUAVZtDXjsD1AduGD94mbUJQg=
-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8/go.mod h1:ajolgob5MxlHdp5HllF7u3rTlCgER4InqfP7M/xl6HQ=
+github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659 h1:HmRi+QkAcKkOcLD90xbf7qZy95muQEd/DqttK9xtpHk=
+github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659/go.mod h1:m6ZZpg+lZB3XTIKlbSqQgi4NrBPtARv23b8vGYDoCo4=
 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f h1:pdEh1Z7zH5Og9nS7jRuqwup3bcPsC6faDNQ6mgrV9ws=
 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=

refactor: use pipe for analytics

shared/analytics.go link
+13 -3
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
diff --git a/shared/analytics.go b/shared/analytics.go
index cfe69bc..136e347 100644
--- a/shared/analytics.go
+++ b/shared/analytics.go
@@ -4,6 +4,7 @@ import (
 	"crypto/hmac"
 	"crypto/sha256"
 	"encoding/hex"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"log/slog"
@@ -12,6 +13,7 @@ import (
 	"net/url"
 
 	"github.com/picosh/pico/db"
+	"github.com/picosh/pubsub"
 	"github.com/simplesurance/go-ip-anonymizer/ipanonymizer"
 	"github.com/x-way/crawlerdetect"
 )
@@ -124,10 +126,18 @@ func AnalyticsVisitFromRequest(r *http.Request, dbpool db.DB, userID string, sec
 }
 
 func AnalyticsCollect(ch chan *db.AnalyticsVisits, dbpool db.DB, logger *slog.Logger) {
-	for view := range ch {
-		err := dbpool.InsertVisit(view)
+	info := NewPicoPipeClient()
+	metricDrain := pubsub.NewRemoteClientWriter(info, logger, 0)
+	go metricDrain.KeepAlive("pub metric-drain -b=false")
+
+	for visit := range ch {
+		data, err := json.Marshal(visit)
+		if err != nil {
+			logger.Error("could not json marshall visit record", "err", err)
+		}
+		_, err = metricDrain.Write(data)
 		if err != nil {
-			logger.Error("could not insert view record", "err", err)
+			logger.Error("could not write to metric-drain", "err", err)
 		}
 	}
 }