dashboard / pico / feat(auth): subscribe to pico's metric-drain pipe #32 rss

accepted · opened on 2024-11-12T21:13:35Z by erock
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 32
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 32
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 32 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 32
# accept PR
ssh pr.pico.sh pr accept 32
# close PR
ssh pr.pico.sh pr close 32

Logs

erock created pr with ps-66 on 2024-11-12T21:13:35Z
erock added ps-73 on 2024-11-14T15:42:32Z
erock changed status on 2024-11-23T03:34:16Z {"status":"accepted"}

Patchsets

ps-66 by erock on 2024-11-12T21:13:35Z
Range Diff ↕ rd-73
1: 7ec3569 = 1: 7ec3569 feat(auth): subscribe to pico's metric-drain pipe
2: 8a197f0 = 2: 8a197f0 chore: update pubsub
3: d4bda15 = 3: d4bda15 refactor: use pipe for analytics
-: ------- > 4: 2b7c358 chore: prep for release
ps-73 by erock on 2024-11-14T15:42:32Z

Patchset ps-66

chore: update pubsub

Eric Bower
2024-11-12T16:29:38Z
go.mod
+2 -2
go.sum
+2 -2

refactor: use pipe for analytics

Eric Bower
2024-11-12T21:12:05Z
Back to top

feat(auth): subscribe to pico's metric-drain pipe

We have a lot of services that need to record site usage analytics so we
need a distributed way to receive these events.

This overloads the auth service since it's now serving as a destination
for our metric-drain.
auth/api.go link
+33 -1
 1diff --git a/auth/api.go b/auth/api.go
 2index 8e279c4..8fdcaa2 100644
 3--- a/auth/api.go
 4+++ b/auth/api.go
 5@@ -1,6 +1,7 @@
 6 package auth
 7 
 8 import (
 9+	"bufio"
10 	"context"
11 	"crypto/hmac"
12 	"encoding/json"
13@@ -18,6 +19,7 @@ import (
14 	"github.com/picosh/pico/db"
15 	"github.com/picosh/pico/db/postgres"
16 	"github.com/picosh/pico/shared"
17+	"github.com/picosh/pubsub"
18 	"github.com/picosh/utils"
19 )
20 
21@@ -639,6 +641,31 @@ func handler(routes []shared.Route, client *Client) http.HandlerFunc {
22 	}
23 }
24 
25+func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
26+	conn := shared.NewPicoPipeClient()
27+	stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn)
28+
29+	if err != nil {
30+		logger.Error("could not sub to metric-drain", "err", err)
31+		return
32+	}
33+
34+	scanner := bufio.NewScanner(stdoutPipe)
35+	for scanner.Scan() {
36+		line := scanner.Text()
37+		view := db.AnalyticsVisits{}
38+		err := json.Unmarshal([]byte(line), &view)
39+		if err != nil {
40+			logger.Error("json unmarshal", "err", err)
41+			continue
42+		}
43+		err = dbpool.InsertVisit(&view)
44+		if err != nil {
45+			logger.Error("could not insert view record", "err", err)
46+		}
47+	}
48+}
49+
50 type AuthCfg struct {
51 	Debug  bool
52 	Port   string
53@@ -667,6 +694,11 @@ func StartApiServer() {
54 		Logger: logger,
55 	}
56 
57+	ctx := context.Background()
58+	// gather metrics in the auth service
59+	go metricDrainSub(ctx, db, logger)
60+	defer ctx.Done()
61+
62 	routes := createMainRoutes()
63 
64 	if cfg.Debug {
65@@ -679,6 +711,6 @@ func StartApiServer() {
66 	client.Logger.Info("starting server on port", "port", cfg.Port)
67 	err := http.ListenAndServe(portStr, router)
68 	if err != nil {
69-		client.Logger.Info(err.Error())
70+		client.Logger.Info("http-serve", "err", err.Error())
71 	}
72 }
db/db.go link
+10 -10
 1diff --git a/db/db.go b/db/db.go
 2index 4806b8d..fbcc715 100644
 3--- a/db/db.go
 4+++ b/db/db.go
 5@@ -161,16 +161,16 @@ type PostAnalytics struct {
 6 }
 7 
 8 type AnalyticsVisits struct {
 9-	ID        string
10-	UserID    string
11-	ProjectID string
12-	PostID    string
13-	Host      string
14-	Path      string
15-	IpAddress string
16-	UserAgent string
17-	Referer   string
18-	Status    int
19+	ID        string `json:"id"`
20+	UserID    string `json:"user_id"`
21+	ProjectID string `json:"project_id"`
22+	PostID    string `json:"post_id"`
23+	Host      string `json:"host"`
24+	Path      string `json:"path"`
25+	IpAddress string `json:"ip_adress"`
26+	UserAgent string `json:"user_agent"`
27+	Referer   string `json:"referer"`
28+	Status    int    `json:"status"`
29 }
30 
31 type VisitInterval struct {
pico/cli.go link
+2 -7
 1diff --git a/pico/cli.go b/pico/cli.go
 2index e64f0e4..ab65548 100644
 3--- a/pico/cli.go
 4+++ b/pico/cli.go
 5@@ -70,13 +70,8 @@ func (c *Cmd) notifications() error {
 6 }
 7 
 8 func (c *Cmd) logs(ctx context.Context) error {
 9-	stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, &pipeLogger.PubSubConnectionInfo{
10-		RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
11-		KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
12-		KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
13-		RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
14-		RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
15-	})
16+	conn := shared.NewPicoPipeClient()
17+	stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, conn)
18 
19 	if err != nil {
20 		return err
shared/config.go link
+2 -7
 1diff --git a/shared/config.go b/shared/config.go
 2index 0413e1e..f1ad895 100644
 3--- a/shared/config.go
 4+++ b/shared/config.go
 5@@ -279,13 +279,8 @@ func CreateLogger(space string) *slog.Logger {
 6 	newLogger := log
 7 
 8 	if strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true" {
 9-		newLog, err := pipeLogger.SendLogRegister(log, &pipeLogger.PubSubConnectionInfo{
10-			RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
11-			KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
12-			KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
13-			RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
14-			RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
15-		}, 100)
16+		conn := NewPicoPipeClient()
17+		newLog, err := pipeLogger.SendLogRegister(log, conn, 100)
18 
19 		if err == nil {
20 			newLogger = newLog
shared/pubsub.go link
+16 -0
 1diff --git a/shared/pubsub.go b/shared/pubsub.go
 2new file mode 100644
 3index 0000000..e0d9e73
 4--- /dev/null
 5+++ b/shared/pubsub.go
 6@@ -0,0 +1,16 @@
 7+package shared
 8+
 9+import (
10+	"github.com/picosh/pubsub"
11+	"github.com/picosh/utils"
12+)
13+
14+func NewPicoPipeClient() *pubsub.RemoteClientInfo {
15+	return &pubsub.RemoteClientInfo{
16+		RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
17+		KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
18+		KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
19+		RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
20+		RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
21+	}
22+}
tui/logs/logs.go link
+3 -8
 1diff --git a/tui/logs/logs.go b/tui/logs/logs.go
 2index d2f2891..6e01c7c 100644
 3--- a/tui/logs/logs.go
 4+++ b/tui/logs/logs.go
 5@@ -11,6 +11,7 @@ import (
 6 	"github.com/charmbracelet/bubbles/viewport"
 7 	tea "github.com/charmbracelet/bubbletea"
 8 	"github.com/charmbracelet/lipgloss"
 9+	"github.com/picosh/pico/shared"
10 	"github.com/picosh/pico/tui/common"
11 	"github.com/picosh/pico/tui/pages"
12 	"github.com/picosh/utils"
13@@ -170,14 +171,8 @@ func (m Model) waitForActivity(sub chan map[string]any) tea.Cmd {
14 
15 func (m Model) connectLogs(sub chan map[string]any) tea.Cmd {
16 	return func() tea.Msg {
17-		stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, &pipeLogger.PubSubConnectionInfo{
18-			RemoteHost:     utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
19-			KeyLocation:    utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
20-			KeyPassphrase:  utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
21-			RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
22-			RemoteUser:     utils.GetEnv("PICO_PIPE_USER", "pico"),
23-		})
24-
25+		conn := shared.NewPicoPipeClient()
26+		stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, conn)
27 		if err != nil {
28 			return errMsg(err)
29 		}

chore: update pubsub

go.mod link
+2 -2
 1diff --git a/go.mod b/go.mod
 2index 90c2f02..b588d80 100644
 3--- a/go.mod
 4+++ b/go.mod
 5@@ -31,13 +31,12 @@ require (
 6 	github.com/gorilla/feeds v1.2.0
 7 	github.com/lib/pq v1.10.9
 8 	github.com/microcosm-cc/bluemonday v1.0.27
 9-	github.com/minio/minio-go/v7 v7.0.80
10 	github.com/mmcdole/gofeed v1.3.0
11 	github.com/muesli/reflow v0.3.0
12 	github.com/muesli/termenv v0.15.3-0.20240912151726-82936c5ea257
13 	github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
14 	github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23
15-	github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8
16+	github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659
17 	github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f
18 	github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9
19 	github.com/picosh/utils v0.0.0-20241018143404-b351d5d765f3
20@@ -133,6 +132,7 @@ require (
21 	github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
22 	github.com/minio/madmin-go/v3 v3.0.75 // indirect
23 	github.com/minio/md5-simd v1.1.2 // indirect
24+	github.com/minio/minio-go/v7 v7.0.80 // indirect
25 	github.com/mmcdole/goxpp v1.1.1 // indirect
26 	github.com/mmcloughlin/md4 v0.1.2 // indirect
27 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
go.sum link
+2 -2
 1diff --git a/go.sum b/go.sum
 2index 4fc24c3..c70d06b 100644
 3--- a/go.sum
 4+++ b/go.sum
 5@@ -269,8 +269,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
 6 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
 7 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23 h1:NEJ5a4UXeF0/X7xmYNzXcwLQID9DwgazlqkMMC5zZ3M=
 8 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23/go.mod h1:cF+eAl4G1vU+WOD8cYCKaxokHo6MWmbR8J4/SJnvESg=
 9-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8 h1:E/eQsxdHBctPArAzjSHUAVZtDXjsD1AduGD94mbUJQg=
10-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8/go.mod h1:ajolgob5MxlHdp5HllF7u3rTlCgER4InqfP7M/xl6HQ=
11+github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659 h1:HmRi+QkAcKkOcLD90xbf7qZy95muQEd/DqttK9xtpHk=
12+github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659/go.mod h1:m6ZZpg+lZB3XTIKlbSqQgi4NrBPtARv23b8vGYDoCo4=
13 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f h1:pdEh1Z7zH5Og9nS7jRuqwup3bcPsC6faDNQ6mgrV9ws=
14 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
15 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=

refactor: use pipe for analytics

shared/analytics.go link
+13 -3
 1diff --git a/shared/analytics.go b/shared/analytics.go
 2index cfe69bc..136e347 100644
 3--- a/shared/analytics.go
 4+++ b/shared/analytics.go
 5@@ -4,6 +4,7 @@ import (
 6 	"crypto/hmac"
 7 	"crypto/sha256"
 8 	"encoding/hex"
 9+	"encoding/json"
10 	"errors"
11 	"fmt"
12 	"log/slog"
13@@ -12,6 +13,7 @@ import (
14 	"net/url"
15 
16 	"github.com/picosh/pico/db"
17+	"github.com/picosh/pubsub"
18 	"github.com/simplesurance/go-ip-anonymizer/ipanonymizer"
19 	"github.com/x-way/crawlerdetect"
20 )
21@@ -124,10 +126,18 @@ func AnalyticsVisitFromRequest(r *http.Request, dbpool db.DB, userID string, sec
22 }
23 
24 func AnalyticsCollect(ch chan *db.AnalyticsVisits, dbpool db.DB, logger *slog.Logger) {
25-	for view := range ch {
26-		err := dbpool.InsertVisit(view)
27+	info := NewPicoPipeClient()
28+	metricDrain := pubsub.NewRemoteClientWriter(info, logger, 0)
29+	go metricDrain.KeepAlive("pub metric-drain -b=false")
30+
31+	for visit := range ch {
32+		data, err := json.Marshal(visit)
33+		if err != nil {
34+			logger.Error("could not json marshall visit record", "err", err)
35+		}
36+		_, err = metricDrain.Write(data)
37 		if err != nil {
38-			logger.Error("could not insert view record", "err", err)
39+			logger.Error("could not write to metric-drain", "err", err)
40 		}
41 	}
42 }