Patchset ps-112
feat(tui.tuns): conn events
Eric Bower
Makefile
+2
-1
pkg/apps/auth/api.go
+35
-0
pkg/db/db.go
+17
-0
pkg/db/postgres/storage.go
+39
-0
pkg/db/stub/stub.go
+8
-0
pkg/tui/tuns.go
+63
-13
feat(tui.tuns): conn events
Makefile
link
+2
-1
+2
-1
1diff --git a/Makefile b/Makefile
2index 01f00a4..d452d2f 100644
3--- a/Makefile
4+++ b/Makefile
5@@ -126,10 +126,11 @@ migrate:
6 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
7 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
8 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241202_add_more_idx_analytics.sql
9+ $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20250319_add_tuns_event_logs_table.sql
10 .PHONY: migrate
11
12 latest:
13- $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241202_add_more_idx_analytics.sql
14+ $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20250319_add_tuns_event_logs_table.sql
15 .PHONY: latest
16
17 psql:
pkg/apps/auth/api.go
link
+35
-0
+35
-0
1diff --git a/pkg/apps/auth/api.go b/pkg/apps/auth/api.go
2index bb72784..6534040 100644
3--- a/pkg/apps/auth/api.go
4+++ b/pkg/apps/auth/api.go
5@@ -20,6 +20,7 @@ import (
6 "github.com/picosh/pico/pkg/db/postgres"
7 "github.com/picosh/pico/pkg/shared"
8 "github.com/picosh/utils"
9+ "github.com/picosh/utils/pipe"
10 "github.com/picosh/utils/pipe/metrics"
11 "github.com/prometheus/client_golang/prometheus/promhttp"
12 )
13@@ -723,6 +724,39 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
14 }
15 }
16
17+func tunsEventLogDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
18+ drain := pipe.NewReconnectReadWriteCloser(
19+ ctx,
20+ logger,
21+ shared.NewPicoPipeClient(),
22+ "tuns-event-drain-sub",
23+ "sub tuns-event-drain -k",
24+ 100,
25+ 10*time.Millisecond,
26+ )
27+
28+ for {
29+ scanner := bufio.NewScanner(drain)
30+ scanner.Buffer(make([]byte, 32*1024), 32*1024)
31+ for scanner.Scan() {
32+ line := scanner.Text()
33+ clean := strings.TrimSpace(line)
34+ var log db.TunsEventLog
35+ err := json.Unmarshal([]byte(clean), &log)
36+ if err != nil {
37+ logger.Error("could not unmarshal line", "err", err)
38+ continue
39+ }
40+
41+ logger.Info("inserting tuns event log", "log", log)
42+ err = dbpool.InsertTunsEventLog(&log)
43+ if err != nil {
44+ logger.Error("could not insert tuns event log", "err", err)
45+ }
46+ }
47+ }
48+}
49+
50 func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
51 serverRoot, err := fs.Sub(embedFS, "public")
52 if err != nil {
53@@ -792,6 +826,7 @@ func StartApiServer() {
54
55 // gather metrics in the auth service
56 go metricDrainSub(ctx, db, logger, cfg.Secret)
57+ go tunsEventLogDrainSub(ctx, db, logger, cfg.Secret)
58
59 defer ctx.Done()
60
pkg/db/db.go
link
+17
-0
+17
-0
1diff --git a/pkg/db/db.go b/pkg/db/db.go
2index f251d88..56acce0 100644
3--- a/pkg/db/db.go
4+++ b/pkg/db/db.go
5@@ -323,6 +323,20 @@ type UserServiceStats struct {
6 LatestUpdatedAt time.Time
7 }
8
9+type TunsEventLog struct {
10+ ID string `json:"id"`
11+ ServerID string `json:"server_id"`
12+ Time *time.Time `json:"time"`
13+ User string `json:"user"`
14+ UserId string `json:"user_id"`
15+ RemoteAddr string `json:"remote_addr"`
16+ EventType string `json:"event_type"`
17+ TunnelType string `json:"tunnel_type"`
18+ ConnectionType string `json:"connection_type"`
19+ TunnelAddrs []string `json:"tunnel_addrs"`
20+ CreatedAt *time.Time `json:"created_at"`
21+}
22+
23 var NameValidator = regexp.MustCompile("^[a-zA-Z0-9]{1,50}$")
24 var DenyList = []string{
25 "admin",
26@@ -415,5 +429,8 @@ type DB interface {
27
28 FindUserStats(userID string) (*UserStats, error)
29
30+ InsertTunsEventLog(log *TunsEventLog) error
31+ FindTunsEventLog(userID, addr string) ([]*TunsEventLog, error)
32+
33 Close() error
34 }
pkg/db/postgres/storage.go
link
+39
-0
+39
-0
1diff --git a/pkg/db/postgres/storage.go b/pkg/db/postgres/storage.go
2index ce39adc..d0eefe0 100644
3--- a/pkg/db/postgres/storage.go
4+++ b/pkg/db/postgres/storage.go
5@@ -1796,6 +1796,45 @@ func (me *PsqlDB) findPagesStats(userID string) (*db.UserServiceStats, error) {
6 return &stats, nil
7 }
8
9+func (me *PsqlDB) InsertTunsEventLog(log *db.TunsEventLog) error {
10+ _, err := me.Db.Exec(
11+ `INSERT INTO tuns_event_logs
12+ (user_id, server_id, remote_addr, tunnel_type, connection_type, tunnel_addrs)
13+ VALUES
14+ ($1, $2, $3, $4, $5, $6)`,
15+ log.UserId, log.ServerID, log.RemoteAddr, log.TunnelType, log.ConnectionType, log.TunnelAddrs,
16+ )
17+ return err
18+}
19+
20+func (me *PsqlDB) FindTunsEventLog(userID, addr string) ([]*db.TunsEventLog, error) {
21+ logs := []*db.TunsEventLog{}
22+ rs, err := me.Db.Query(
23+ `SELECT user_id, server_id, remote_addr, tunnel_type, connection_type, tunnel_addrs, created_at
24+ FROM tuns_event_logs WHERE user_id=$1 AND tunnel_addrs @> ARRAY[$2] ORDER BY created_at DESC`, userID, addr)
25+ if err != nil {
26+ return nil, err
27+ }
28+
29+ for rs.Next() {
30+ log := db.TunsEventLog{}
31+ err := rs.Scan(
32+ &log.ID, &log.UserId, &log.ServerID, &log.RemoteAddr,
33+ &log.TunnelType, &log.ConnectionType, &log.TunnelAddrs, &log.ConnectionType, &log.CreatedAt,
34+ )
35+ if err != nil {
36+ return nil, err
37+ }
38+ logs = append(logs, &log)
39+ }
40+
41+ if rs.Err() != nil {
42+ return nil, rs.Err()
43+ }
44+
45+ return logs, nil
46+}
47+
48 func (me *PsqlDB) FindUserStats(userID string) (*db.UserStats, error) {
49 stats := db.UserStats{}
50 rs, err := me.Db.Query(`SELECT cur_space, count(id), min(created_at), max(created_at), max(updated_at) FROM posts WHERE user_id=$1 GROUP BY cur_space`, userID)
pkg/db/stub/stub.go
link
+8
-0
+8
-0
1diff --git a/pkg/db/stub/stub.go b/pkg/db/stub/stub.go
2index b1d8d2d..59df3cf 100644
3--- a/pkg/db/stub/stub.go
4+++ b/pkg/db/stub/stub.go
5@@ -268,3 +268,11 @@ func (me *StubDB) FindTagsForUser(userID string, tag string) ([]string, error) {
6 func (me *StubDB) FindUserStats(userID string) (*db.UserStats, error) {
7 return nil, notImpl
8 }
9+
10+func (me *StubDB) InsertTunsEventLog(log *db.TunsEventLog) error {
11+ return notImpl
12+}
13+
14+func (me *StubDB) FindTunsEventLog(userID, addr string) ([]*db.TunsEventLog, error) {
15+ return nil, notImpl
16+}
pkg/tui/tuns.go
link
+63
-13
+63
-13
1diff --git a/pkg/tui/tuns.go b/pkg/tui/tuns.go
2index 8224e09..a632ec4 100644
3--- a/pkg/tui/tuns.go
4+++ b/pkg/tui/tuns.go
5@@ -15,6 +15,7 @@ import (
6 "git.sr.ht/~rockorager/vaxis/vxfw/list"
7 "git.sr.ht/~rockorager/vaxis/vxfw/richtext"
8 "git.sr.ht/~rockorager/vaxis/vxfw/text"
9+ "github.com/picosh/pico/pkg/db"
10 "github.com/picosh/pico/pkg/shared"
11 "github.com/picosh/utils/pipe"
12 )
13@@ -78,21 +79,25 @@ type ResultLogLineLoaded struct {
14
15 type TunsLoaded struct{}
16
17+type EventLogsLoaded struct{}
18+
19 type TunsPage struct {
20 shared *SharedModel
21
22- loading bool
23- err error
24- tuns []TunsClientSimple
25- selected string
26- focus string
27- leftPane list.Dynamic
28- rightPane *Pager
29- logs []*ResultLog
30- logList list.Dynamic
31- ctx context.Context
32- done context.CancelFunc
33- isAdmin bool
34+ loading bool
35+ err error
36+ tuns []TunsClientSimple
37+ selected string
38+ focus string
39+ leftPane list.Dynamic
40+ rightPane *Pager
41+ logs []*ResultLog
42+ logList list.Dynamic
43+ ctx context.Context
44+ done context.CancelFunc
45+ isAdmin bool
46+ eventLogs []*db.TunsEventLog
47+ eventLogList list.Dynamic
48 }
49
50 func NewTunsPage(shrd *SharedModel) *TunsPage {
51@@ -103,6 +108,7 @@ func NewTunsPage(shrd *SharedModel) *TunsPage {
52 }
53 m.leftPane = list.Dynamic{DrawCursor: true, Builder: m.getLeftWidget}
54 m.logList = list.Dynamic{DrawCursor: true, Builder: m.getLogWidget}
55+ m.eventLogList = list.Dynamic{DrawCursor: true, Builder: m.getEventLogWidget}
56 ff, _ := shrd.Dbpool.FindFeatureForUser(m.shared.User.ID, "admin")
57 if ff != nil {
58 m.isAdmin = true
59@@ -145,6 +151,24 @@ func (m *TunsPage) getLogWidget(i uint, cursor uint) vxfw.Widget {
60 return txt
61 }
62
63+func (m *TunsPage) getEventLogWidget(i uint, cursor uint) vxfw.Widget {
64+ if int(i) >= len(m.tuns) {
65+ return nil
66+ }
67+
68+ log := m.eventLogs[i]
69+ style := vaxis.Style{Foreground: green}
70+ if log.EventType == "disconnect" {
71+ style = vaxis.Style{Foreground: red}
72+ }
73+ txt := richtext.New([]vaxis.Segment{
74+ {Text: log.CreatedAt.Format(time.RFC3339) + " "},
75+ {Text: log.EventType, Style: style},
76+ })
77+ txt.Softwrap = false
78+ return txt
79+}
80+
81 func (m *TunsPage) connectToLogs() error {
82 ctx, cancel := context.WithCancel(m.shared.Session.Context())
83 m.ctx = ctx
84@@ -208,6 +232,8 @@ func (m *TunsPage) HandleEvent(ev vaxis.Event, ph vxfw.EventPhase) (vxfw.Command
85 m.logList.SetCursor(uint(len(m.logs) - 1))
86 }
87 return vxfw.RedrawCmd{}, nil
88+ case EventLogsLoaded:
89+ return vxfw.RedrawCmd{}, nil
90 case TunsLoaded:
91 m.focus = "tuns"
92 return vxfw.BatchCmd([]vxfw.Command{
93@@ -218,6 +244,8 @@ func (m *TunsPage) HandleEvent(ev vaxis.Event, ph vxfw.EventPhase) (vxfw.Command
94 if msg.Matches(vaxis.KeyEnter) {
95 m.selected = m.tuns[m.leftPane.Cursor()].TunAddress
96 m.logs = []*ResultLog{}
97+ m.eventLogs = []*db.TunsEventLog{}
98+ go m.fetchEventLogs()
99 return vxfw.RedrawCmd{}, nil
100 }
101 if msg.Matches(vaxis.KeyTab) {
102@@ -325,7 +353,20 @@ func (m *TunsPage) Draw(ctx vxfw.DrawContext) (vxfw.Surface, error) {
103 Characters: vaxis.Characters,
104 Max: vxfw.Size{
105 Width: uint16(rightPaneW) - 4,
106- Height: ctx.Max.Height - uint16(ah) - 3,
107+ Height: 15,
108+ },
109+ })
110+ rightSurf.AddChild(0, ah, surf)
111+ ah += int(surf.Size.Height)
112+
113+ brd = NewBorder(&m.eventLogList)
114+ brd.Label = "conn events"
115+ m.focusBorder(brd)
116+ surf, _ = brd.Draw(vxfw.DrawContext{
117+ Characters: vaxis.Characters,
118+ Max: vxfw.Size{
119+ Width: uint16(rightPaneW) - 4,
120+ Height: 15,
121 },
122 })
123 rightSurf.AddChild(0, ah, surf)
124@@ -376,6 +417,15 @@ func fetch(fqdn, auth string) (map[string]*TunsClient, error) {
125 return data.Clients, nil
126 }
127
128+func (m *TunsPage) fetchEventLogs() {
129+ logs, err := m.shared.Dbpool.FindTunsEventLog(m.shared.User.ID, m.selected)
130+ if err != nil {
131+ m.err = err
132+ return
133+ }
134+ m.eventLogs = logs
135+}
136+
137 func (m *TunsPage) fetchTuns() {
138 tMap, err := fetch("tuns.sh", m.shared.Cfg.TunsSecret)
139 if err != nil {
sql/migrations/20250319_add_tuns_event_logs_table.sql
link
+17
-0
+17
-0
1diff --git a/sql/migrations/20250319_add_tuns_event_logs_table.sql b/sql/migrations/20250319_add_tuns_event_logs_table.sql
2new file mode 100644
3index 0000000..e43b2d2
4--- /dev/null
5+++ b/sql/migrations/20250319_add_tuns_event_logs_table.sql
6@@ -0,0 +1,17 @@
7+CREATE TABLE IF NOT EXISTS tuns_event_logs (
8+ id uuid NOT NULL DEFAULT uuid_generate_v4(),
9+ user_id uuid NOT NULL,
10+ server_id text,
11+ remote_addr text,
12+ event_type text,
13+ tunnel_type text,
14+ connection_type text,
15+ tunnel_addrs text[],
16+ created_at timestamp without time zone NOT NULL DEFAULT NOW(),
17+ CONSTRAINT tuns_event_logs_pkey PRIMARY KEY (id),
18+ CONSTRAINT fk_tuns_event_logs_user
19+ FOREIGN KEY(user_id)
20+ REFERENCES app_users(id)
21+ ON DELETE CASCADE
22+ ON UPDATE CASCADE
23+);