Patchsets
Range Diff ↕
1: 7ec3569 = 1: 7ec3569 feat(auth): subscribe to pico's metric-drain pipe
2: 8a197f0 = 2: 8a197f0 chore: update pubsub
3: d4bda15 = 3: d4bda15 refactor: use pipe for analytics
-: ------- > 4: 2b7c358 chore: prep for release
Patchset ps-66
feat(auth): subscribe to pico's metric-drain pipe
Eric Bower
auth/api.go
+33
-1
db/db.go
+10
-10
pico/cli.go
+2
-7
shared/config.go
+2
-7
shared/pubsub.go
+16
-0
tui/logs/logs.go
+3
-8
refactor: use pipe for analytics
Eric Bower
shared/analytics.go
+13
-3
feat(auth): subscribe to pico's metric-drain pipe
We have a lot of services that need to record site usage analytics so we need a distributed way to receive these events. This overloads the auth service since it's now serving as a destination for our metric-drain.
auth/api.go
link
+33
-1
+33
-1
1diff --git a/auth/api.go b/auth/api.go
2index 8e279c4..8fdcaa2 100644
3--- a/auth/api.go
4+++ b/auth/api.go
5@@ -1,6 +1,7 @@
6 package auth
7
8 import (
9+ "bufio"
10 "context"
11 "crypto/hmac"
12 "encoding/json"
13@@ -18,6 +19,7 @@ import (
14 "github.com/picosh/pico/db"
15 "github.com/picosh/pico/db/postgres"
16 "github.com/picosh/pico/shared"
17+ "github.com/picosh/pubsub"
18 "github.com/picosh/utils"
19 )
20
21@@ -639,6 +641,31 @@ func handler(routes []shared.Route, client *Client) http.HandlerFunc {
22 }
23 }
24
25+func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
26+ conn := shared.NewPicoPipeClient()
27+ stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn)
28+
29+ if err != nil {
30+ logger.Error("could not sub to metric-drain", "err", err)
31+ return
32+ }
33+
34+ scanner := bufio.NewScanner(stdoutPipe)
35+ for scanner.Scan() {
36+ line := scanner.Text()
37+ view := db.AnalyticsVisits{}
38+ err := json.Unmarshal([]byte(line), &view)
39+ if err != nil {
40+ logger.Error("json unmarshal", "err", err)
41+ continue
42+ }
43+ err = dbpool.InsertVisit(&view)
44+ if err != nil {
45+ logger.Error("could not insert view record", "err", err)
46+ }
47+ }
48+}
49+
50 type AuthCfg struct {
51 Debug bool
52 Port string
53@@ -667,6 +694,11 @@ func StartApiServer() {
54 Logger: logger,
55 }
56
57+ ctx := context.Background()
58+ // gather metrics in the auth service
59+ go metricDrainSub(ctx, db, logger)
60+ defer ctx.Done()
61+
62 routes := createMainRoutes()
63
64 if cfg.Debug {
65@@ -679,6 +711,6 @@ func StartApiServer() {
66 client.Logger.Info("starting server on port", "port", cfg.Port)
67 err := http.ListenAndServe(portStr, router)
68 if err != nil {
69- client.Logger.Info(err.Error())
70+ client.Logger.Info("http-serve", "err", err.Error())
71 }
72 }
db/db.go
link
+10
-10
+10
-10
1diff --git a/db/db.go b/db/db.go
2index 4806b8d..fbcc715 100644
3--- a/db/db.go
4+++ b/db/db.go
5@@ -161,16 +161,16 @@ type PostAnalytics struct {
6 }
7
8 type AnalyticsVisits struct {
9- ID string
10- UserID string
11- ProjectID string
12- PostID string
13- Host string
14- Path string
15- IpAddress string
16- UserAgent string
17- Referer string
18- Status int
19+ ID string `json:"id"`
20+ UserID string `json:"user_id"`
21+ ProjectID string `json:"project_id"`
22+ PostID string `json:"post_id"`
23+ Host string `json:"host"`
24+ Path string `json:"path"`
25+ IpAddress string `json:"ip_adress"`
26+ UserAgent string `json:"user_agent"`
27+ Referer string `json:"referer"`
28+ Status int `json:"status"`
29 }
30
31 type VisitInterval struct {
pico/cli.go
link
+2
-7
+2
-7
1diff --git a/pico/cli.go b/pico/cli.go
2index e64f0e4..ab65548 100644
3--- a/pico/cli.go
4+++ b/pico/cli.go
5@@ -70,13 +70,8 @@ func (c *Cmd) notifications() error {
6 }
7
8 func (c *Cmd) logs(ctx context.Context) error {
9- stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, &pipeLogger.PubSubConnectionInfo{
10- RemoteHost: utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
11- KeyLocation: utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
12- KeyPassphrase: utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
13- RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
14- RemoteUser: utils.GetEnv("PICO_PIPE_USER", "pico"),
15- })
16+ conn := shared.NewPicoPipeClient()
17+ stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, conn)
18
19 if err != nil {
20 return err
tui/logs/logs.go
link
+3
-8
+3
-8
1diff --git a/tui/logs/logs.go b/tui/logs/logs.go
2index d2f2891..6e01c7c 100644
3--- a/tui/logs/logs.go
4+++ b/tui/logs/logs.go
5@@ -11,6 +11,7 @@ import (
6 "github.com/charmbracelet/bubbles/viewport"
7 tea "github.com/charmbracelet/bubbletea"
8 "github.com/charmbracelet/lipgloss"
9+ "github.com/picosh/pico/shared"
10 "github.com/picosh/pico/tui/common"
11 "github.com/picosh/pico/tui/pages"
12 "github.com/picosh/utils"
13@@ -170,14 +171,8 @@ func (m Model) waitForActivity(sub chan map[string]any) tea.Cmd {
14
15 func (m Model) connectLogs(sub chan map[string]any) tea.Cmd {
16 return func() tea.Msg {
17- stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, &pipeLogger.PubSubConnectionInfo{
18- RemoteHost: utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
19- KeyLocation: utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
20- KeyPassphrase: utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
21- RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
22- RemoteUser: utils.GetEnv("PICO_PIPE_USER", "pico"),
23- })
24-
25+ conn := shared.NewPicoPipeClient()
26+ stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, conn)
27 if err != nil {
28 return errMsg(err)
29 }
chore: update pubsub
go.mod
link
+2
-2
+2
-2
1diff --git a/go.mod b/go.mod
2index 90c2f02..b588d80 100644
3--- a/go.mod
4+++ b/go.mod
5@@ -31,13 +31,12 @@ require (
6 github.com/gorilla/feeds v1.2.0
7 github.com/lib/pq v1.10.9
8 github.com/microcosm-cc/bluemonday v1.0.27
9- github.com/minio/minio-go/v7 v7.0.80
10 github.com/mmcdole/gofeed v1.3.0
11 github.com/muesli/reflow v0.3.0
12 github.com/muesli/termenv v0.15.3-0.20240912151726-82936c5ea257
13 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
14 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23
15- github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8
16+ github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659
17 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f
18 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9
19 github.com/picosh/utils v0.0.0-20241018143404-b351d5d765f3
20@@ -133,6 +132,7 @@ require (
21 github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
22 github.com/minio/madmin-go/v3 v3.0.75 // indirect
23 github.com/minio/md5-simd v1.1.2 // indirect
24+ github.com/minio/minio-go/v7 v7.0.80 // indirect
25 github.com/mmcdole/goxpp v1.1.1 // indirect
26 github.com/mmcloughlin/md4 v0.1.2 // indirect
27 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
go.sum
link
+2
-2
+2
-2
1diff --git a/go.sum b/go.sum
2index 4fc24c3..c70d06b 100644
3--- a/go.sum
4+++ b/go.sum
5@@ -269,8 +269,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
6 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
7 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23 h1:NEJ5a4UXeF0/X7xmYNzXcwLQID9DwgazlqkMMC5zZ3M=
8 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23/go.mod h1:cF+eAl4G1vU+WOD8cYCKaxokHo6MWmbR8J4/SJnvESg=
9-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8 h1:E/eQsxdHBctPArAzjSHUAVZtDXjsD1AduGD94mbUJQg=
10-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8/go.mod h1:ajolgob5MxlHdp5HllF7u3rTlCgER4InqfP7M/xl6HQ=
11+github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659 h1:HmRi+QkAcKkOcLD90xbf7qZy95muQEd/DqttK9xtpHk=
12+github.com/picosh/pubsub v0.0.0-20241112151357-866d44c53659/go.mod h1:m6ZZpg+lZB3XTIKlbSqQgi4NrBPtARv23b8vGYDoCo4=
13 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f h1:pdEh1Z7zH5Og9nS7jRuqwup3bcPsC6faDNQ6mgrV9ws=
14 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
15 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=