Logs
Patchsets
Range Diff ↕
1: 77aaa29 ! 1: 8d56535 reactor(metric-drain): use caddy json format
@@ auth/api.go "log/slog" "net/http" "net/url" + "strings" "time" "github.com/gorilla/feeds" } } +type AccessLogReq struct { + RemoteIP string `json:"remote_ip"` + RemotePort string `json:"remote_port"` + ClientIP string `json:"client_ip"` + Method string `json:"method"` + Host string `json:"host"` + Uri string `json:"uri"` + Headers struct { + UserAgent string `json:"User-Agent"` + Referer string `json:"Referer"` + } `json:"headers"` + Tls struct { + ServerName string `json:"server_name"` + } `json:"tls"` +} + +type CaddyAccessLog struct { + Request AccessLogReq `json:"request"` + Status int `json:"status"` +} + +func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) { + spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2) + space := spaceRaw[0] + host := access.Request.Host + path := access.Request.Uri + subdomain := "" + + // grab subdomain based on host + if strings.HasSuffix(host, "tuns.sh") { + subdomain = strings.TrimSuffix(host, ".tuns.sh") + } else if strings.HasSuffix(host, "pgs.sh") { + subdomain = strings.TrimSuffix(host, ".pgs.sh") + } else if strings.HasSuffix(host, "prose.sh") { + subdomain = strings.TrimSuffix(host, ".prose.sh") + } else { + subdomain = shared.GetCustomDomain(host, space) + } + + // get user and namespace details from subdomain + props, err := shared.GetProjectFromSubdomain(subdomain) + if err != nil { + return nil, err + } + // get user ID + user, err := dbpool.FindUserForName(props.Username) + if err != nil { + return nil, err + } + + projectID := "" + postID := "" + if space == "pgs" { // figure out project ID + project, err := dbpool.FindProjectByName(user.ID, props.ProjectName) + if err != nil { + return nil, err + } + projectID = project.ID + } else if space == "prose" { // figure out post ID + if path == "" || path == "/" { + } else { + post, err := dbpool.FindPostWithSlug(path, user.ID, space) + if err != nil { + return nil, err + } + postID = post.ID + } + } + + return &db.AnalyticsVisits{ + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: access.Request.Headers.UserAgent, + Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log + Status: access.Status, + }, nil +} + func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) { conn := shared.NewPicoPipeClient() stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn) scanner := bufio.NewScanner(stdoutPipe) for scanner.Scan() { line := scanner.Text() - visit := db.AnalyticsVisits{} - err := json.Unmarshal([]byte(line), &visit) drain := metrics.ReconnectReadMetrics( ctx, -1, ) - for { - scanner := bufio.NewScanner(drain) - for scanner.Scan() { - line := scanner.Text() - visit := db.AnalyticsVisits{} - err := json.Unmarshal([]byte(line), &visit) - if err != nil { - logger.Error("json unmarshal", "err", err) - continue - } - - user := slog.Any("userId", visit.UserID) - - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) - if err != nil { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Info("could not record analytics visit", "reason", err, "visit", visit, user) - continue - } - } + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) if err != nil { logger.Error("json unmarshal", "err", err) continue } + if err != nil { + logger.Error("json unmarshal", "err", err) + continue + } - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) - logger.Info("inserting visit", "visit", visit, user) - err = dbpool.InsertVisit(&visit) - if err != nil { - logger.Error("could not insert visit record", "err", err, "visit", visit, user) + visit, err := deserializeCaddyAccessLog(dbpool, &accessLog) + if err != nil { + logger.Error("cannot deserialize access log", "err", err) + continue + } + err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret) if err != nil { if !errors.Is(err, shared.ErrAnalyticsDisabled) { logger.Info("could not record analytics visit", "reason", err) + if err != nil { + if !errors.Is(err, shared.ErrAnalyticsDisabled) { + logger.Info("could not record analytics visit", "reason", err) } } logger.Info("inserting visit", "visit", visit) - err = dbpool.InsertVisit(&visit) - if scanner.Err() != nil { - logger.Error("scanner error", "err", scanner.Err()) + logger.Info("inserting visit", "visit", visit) + err = dbpool.InsertVisit(visit) if err != nil { logger.Error("could not insert visit record", "err", err) + if err != nil { + logger.Error("could not insert visit record", "err", err) } } } @@ caddy.json +{ + "level": "info", + "ts": 1731644477.313701, + "logger": "http.log.access", + "msg": "handled request", + "request": { + "remote_ip": "127.0.0.1", + "remote_port": "40400", + "client_ip": "127.0.0.1", + "proto": "HTTP/2.0", + "method": "GET", + "host": "pgs.sh", + "uri": "/", + "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] }, + "tls": { + "resumed": false, + "version": 772, + "cipher_suite": 4865, + "proto": "h2", + "server_name": "pgs.sh" + } + }, + "bytes_read": 0, + "user_id": "", + "duration": 0.001207084, + "size": 3718, + "status": 200, + "resp_headers": { + "Referrer-Policy": ["no-referrer-when-downgrade"], + "Strict-Transport-Security": ["max-age=31536000;"], + "X-Content-Type-Options": ["nosniff"], + "X-Frame-Options": ["DENY"], + "Server": ["Caddy"], + "Alt-Svc": ["h3=\":443\"; ma=2592000"], + "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"], + "Content-Type": ["text/html; charset=utf-8"], + "X-Xss-Protection": ["1; mode=block"], + "Permissions-Policy": ["interest-cohort=()"] + } +} @@ pgs/tunnel.go "pubkey", pubkeyStr, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { log.Error(err.Error()) return http.HandlerFunc(shared.UnauthorizedHandler) @@ pgs/web.go if !strings.Contains(hostDomain, appDomain) { subdomain := shared.GetCustomDomain(hostDomain, cfg.Space) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Error( "could not get project from subdomain", "host", r.Host, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Info( "could not determine project from subdomain", ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain) router.ServeHTTP(w, r.WithContext(ctx)) } - -type SubdomainProps struct { - ProjectName string - Username string -} - -func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { - props := &SubdomainProps{} - strs := strings.SplitN(subdomain, "-", 2) - props.Username = strs[0] - if len(strs) == 2 { - props.ProjectName = strs[1] - } else { - props.ProjectName = props.Username - } - return props, nil -} @@ shared/api.go "github.com/picosh/utils" ) +type SubdomainProps struct { + ProjectName string + Username string +} + +func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { + props := &SubdomainProps{} + strs := strings.SplitN(subdomain, "-", 2) + props.Username = strs[0] + if len(strs) == 2 { + props.ProjectName = strs[1] + } else { + props.ProjectName = props.Username + } + return props, nil +} + func CorsHeaders(headers http.Header) { headers.Add("Access-Control-Allow-Origin", "*") headers.Add("Vary", "Origin")
-: ------- > 2: a336041 wip
-: ------- > 3: 7ae45b3 chore: wrap
-: ------- > 4: bfa5c4f done
Range Diff ↕
2: a336041 < -: ------- wip
3: 7ae45b3 < -: ------- chore: wrap
4: bfa5c4f < -: ------- done
1: 8d56535 ! 1: c7eeb12 reactor(metric-drain): use caddy access logs
@@ auth/api.go "log/slog" "net/http" "net/url" + "strings" "time" "github.com/gorilla/feeds" "github.com/picosh/pico/db/postgres" "github.com/picosh/pico/shared" "github.com/picosh/utils" + "github.com/picosh/utils/pipe" "github.com/picosh/utils/pipe/metrics" ) } } +type AccessLogReq struct { + RemoteIP string `json:"remote_ip"` + RemotePort string `json:"remote_port"` + ClientIP string `json:"client_ip"` + Method string `json:"method"` + Host string `json:"host"` + Uri string `json:"uri"` + Headers struct { + UserAgent string `json:"User-Agent"` + Referer string `json:"Referer"` + UserAgent []string `json:"User-Agent"` + Referer []string `json:"Referer"` + } `json:"headers"` + Tls struct { + ServerName string `json:"server_name"` + } `json:"tls"` +} + +type RespHeaders struct { + ContentType []string `json:"Content-Type"` +} + +type CaddyAccessLog struct { + Request AccessLogReq `json:"request"` + Status int `json:"status"` + Request AccessLogReq `json:"request"` + Status int `json:"status"` + RespHeaders RespHeaders `json:"resp_headers"` +} + +func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) { + spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2) + space := spaceRaw[0] + host := access.Request.Host + path := access.Request.Uri + subdomain := "" + + // grab subdomain based on host + if strings.HasSuffix(host, "tuns.sh") { + subdomain = strings.TrimSuffix(host, ".tuns.sh") + } else if strings.HasSuffix(host, "pgs.sh") { + subdomain = strings.TrimSuffix(host, ".pgs.sh") + } else if strings.HasSuffix(host, "prose.sh") { + subdomain = strings.TrimSuffix(host, ".prose.sh") + } else { + subdomain = shared.GetCustomDomain(host, space) + } + + // get user and namespace details from subdomain + props, err := shared.GetProjectFromSubdomain(subdomain) + if err != nil { + return nil, err + } + // get user ID + user, err := dbpool.FindUserForName(props.Username) + if err != nil { + return nil, err + } + + projectID := "" + postID := "" + if space == "pgs" { // figure out project ID + project, err := dbpool.FindProjectByName(user.ID, props.ProjectName) + if err != nil { + return nil, err + } + projectID = project.ID + } else if space == "prose" { // figure out post ID + if path == "" || path == "/" { + } else { + post, err := dbpool.FindPostWithSlug(path, user.ID, space) + if err != nil { + return nil, err + } + postID = post.ID + } + } + + return &db.AnalyticsVisits{ + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: access.Request.Headers.UserAgent, + Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log + Status: access.Status, + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: strings.Join(access.Request.Headers.UserAgent, " "), + Referer: strings.Join(access.Request.Headers.Referer, " "), + ContentType: strings.Join(access.RespHeaders.ContentType, " "), + Status: access.Status, + }, nil +} + +// this feels really stupid because i'm taking containter-drain, +// filtering it, and then sending it to metric-drain. The +// metricDrainSub function listens on the metric-drain and saves it. +// So why not just call the necessary functions to save the visit? +// We want to be able to use pipe as a debugging tool which means we +// can manually sub to `metric-drain` and have a nice clean output to view. +func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) { + info := shared.NewPicoPipeClient() + drain := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "container drain", + "sub container-drain -k", + 100, + -1, + ) + + send := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "from container drain to metric drain", + "pub metric-drain -b=false", + 100, + -1, + ) + + for { + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "http.log.access") { + clean := strings.TrimSpace(line) + visit, err := accessLogToVisit(dbpool, clean) + if err != nil { + logger.Debug("could not convert access log to a visit", "err", err) + continue + } + jso, err := json.Marshal(visit) + if err != nil { + logger.Error("could not marshal json of a visit", "err", err) + continue + } + _, _ = send.Write(jso) + } + } + } +} + +func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) { + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) + if err != nil { + return nil, err + } + + return deserializeCaddyAccessLog(dbpool, &accessLog) +} + func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) { drain := metrics.ReconnectReadMetrics( ctx, -1, ) - for { - scanner := bufio.NewScanner(drain) - for scanner.Scan() { - line := scanner.Text() - visit := db.AnalyticsVisits{} - err := json.Unmarshal([]byte(line), &visit) - if err != nil { visit := db.AnalyticsVisits{} err := json.Unmarshal([]byte(line), &visit) if err != nil { - logger.Error("json unmarshal", "err", err) - continue - } + logger.Info("could not unmarshal json", "err", err, "line", line) continue } - - user := slog.Any("userId", visit.UserID) - - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) - if err != nil { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) if err != nil { if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Info("could not record analytics visit", "reason", err, "visit", visit, user) - continue - } - } + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) + if err != nil { + logger.Error("json unmarshal", "err", err) + continue + } + logger.Info("could not record analytics visit", "reason", err) } } - logger.Info("inserting visit", "visit", visit, user) - err = dbpool.InsertVisit(&visit) - if err != nil { + if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") { + continue + } + + logger.Info("inserting visit", "visit", visit) err = dbpool.InsertVisit(&visit) if err != nil { - logger.Error("could not insert visit record", "err", err, "visit", visit, user) + visit, err := deserializeCaddyAccessLog(dbpool, &accessLog) + if err != nil { + logger.Error("cannot deserialize access log", "err", err) + continue + } + err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret) + if err != nil { + if !errors.Is(err, shared.ErrAnalyticsDisabled) { + logger.Info("could not record analytics visit", "reason", err) + logger.Error("could not insert visit record", "err", err) } } - - if scanner.Err() != nil { - logger.Error("scanner error", "err", scanner.Err()) + logger.Info("inserting visit", "visit", visit) + err = dbpool.InsertVisit(visit) + if err != nil { + logger.Error("could not insert visit record", "err", err) } - } } } // gather metrics in the auth service go metricDrainSub(ctx, db, logger, cfg.Secret) + // convert container logs to access logs + go containerDrainSub(ctx, db, logger) + defer ctx.Done() apiConfig := &shared.ApiConfig{ @@ caddy.json +{ + "level": "info", + "ts": 1731644477.313701, + "logger": "http.log.access", + "msg": "handled request", + "request": { + "remote_ip": "127.0.0.1", + "remote_port": "40400", + "client_ip": "127.0.0.1", + "proto": "HTTP/2.0", + "method": "GET", + "host": "pgs.sh", + "uri": "/", + "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] }, + "tls": { + "resumed": false, + "version": 772, + "cipher_suite": 4865, + "proto": "h2", + "server_name": "pgs.sh" + } + }, + "bytes_read": 0, + "user_id": "", + "duration": 0.001207084, + "size": 3718, + "status": 200, + "resp_headers": { + "Referrer-Policy": ["no-referrer-when-downgrade"], + "Strict-Transport-Security": ["max-age=31536000;"], + "X-Content-Type-Options": ["nosniff"], + "X-Frame-Options": ["DENY"], + "Server": ["Caddy"], + "Alt-Svc": ["h3=\":443\"; ma=2592000"], + "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"], + "Content-Type": ["text/html; charset=utf-8"], + "X-Xss-Protection": ["1; mode=block"], + "Permissions-Policy": ["interest-cohort=()"] + } +} @@ pgs/tunnel.go "pubkey", pubkeyStr, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { log.Error(err.Error()) return http.HandlerFunc(shared.UnauthorizedHandler) logger, apiConfig.Dbpool, apiConfig.Storage, - apiConfig.AnalyticsQueue, ) tunnelRouter := TunnelWebRouter{routes} router := http.NewServeMux() @@ pgs/web.go return } - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) - - routes := NewWebRouter(cfg, logger, dbpool, st, ch) + routes := NewWebRouter(cfg, logger, dbpool, st) portStr := fmt.Sprintf(":%s", cfg.Port) logger.Info( type HasPerm = func(proj *db.Project) bool type WebRouter struct { - Cfg *shared.ConfigSite - Logger *slog.Logger - Dbpool db.DB - Storage storage.StorageServe - AnalyticsQueue chan *db.AnalyticsVisits - RootRouter *http.ServeMux - UserRouter *http.ServeMux + Cfg *shared.ConfigSite + Logger *slog.Logger + Dbpool db.DB + Storage storage.StorageServe + RootRouter *http.ServeMux + UserRouter *http.ServeMux } -func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter { +func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter { router := &WebRouter{ - Cfg: cfg, - Logger: logger, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: analytics, + Cfg: cfg, + Logger: logger, + Dbpool: dbpool, + Storage: st, } router.initRouters() return router if !strings.Contains(hostDomain, appDomain) { subdomain := shared.GetCustomDomain(hostDomain, cfg.Space) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Error( "could not get project from subdomain", "host", r.Host, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Info( "could not determine project from subdomain", ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain) router.ServeHTTP(w, r.WithContext(ctx)) } - -type SubdomainProps struct { - ProjectName string - Username string -} - -func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { - props := &SubdomainProps{} - strs := strings.SplitN(subdomain, "-", 2) - props.Username = strs[0] - if len(strs) == 2 { - props.ProjectName = strs[1] - } else { - props.ProjectName = props.Username - } - return props, nil -} @@ shared/api.go "github.com/picosh/utils" ) +type SubdomainProps struct { + ProjectName string + Username string +} + +func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { + props := &SubdomainProps{} + strs := strings.SplitN(subdomain, "-", 2) + props.Username = strs[0] + if len(strs) == 2 { + props.ProjectName = strs[1] + } else { + props.ProjectName = props.Username + } + return props, nil +} + func CorsHeaders(headers http.Header) { headers.Add("Access-Control-Allow-Origin", "*") headers.Add("Vary", "Origin")
Range Diff ↕
1: c7eeb12 ! 1: 4e0839a reactor(metric-drain): use caddy access logs
@@ Makefile $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql + $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql .PHONY: migrate latest: - $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql + $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql .PHONY: latest psql: @@ auth/api.go "log/slog" "net/http" "net/url" + "strings" "time" "github.com/gorilla/feeds" "github.com/picosh/pico/db/postgres" "github.com/picosh/pico/shared" "github.com/picosh/utils" + "github.com/picosh/utils/pipe" "github.com/picosh/utils/pipe/metrics" ) } } +type AccessLogReq struct { + RemoteIP string `json:"remote_ip"` + RemotePort string `json:"remote_port"` + ClientIP string `json:"client_ip"` + Method string `json:"method"` + Host string `json:"host"` + Uri string `json:"uri"` + Headers struct { + UserAgent []string `json:"User-Agent"` + Referer []string `json:"Referer"` + } `json:"headers"` + Tls struct { + ServerName string `json:"server_name"` + } `json:"tls"` +} + +type RespHeaders struct { + ContentType []string `json:"Content-Type"` +} + +type CaddyAccessLog struct { + Request AccessLogReq `json:"request"` + Status int `json:"status"` + RespHeaders RespHeaders `json:"resp_headers"` +} + +func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) { + spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2) + space := spaceRaw[0] + host := access.Request.Host + path := access.Request.Uri + subdomain := "" + + // grab subdomain based on host + if strings.HasSuffix(host, "tuns.sh") { + subdomain = strings.TrimSuffix(host, ".tuns.sh") + } else if strings.HasSuffix(host, "pgs.sh") { + subdomain = strings.TrimSuffix(host, ".pgs.sh") + } else if strings.HasSuffix(host, "prose.sh") { + subdomain = strings.TrimSuffix(host, ".prose.sh") + } else { + subdomain = shared.GetCustomDomain(host, space) + } + + // get user and namespace details from subdomain + props, err := shared.GetProjectFromSubdomain(subdomain) + if err != nil { + return nil, err + } + // get user ID + user, err := dbpool.FindUserForName(props.Username) + if err != nil { + return nil, err + } + + projectID := "" + postID := "" + if space == "pgs" { // figure out project ID + project, err := dbpool.FindProjectByName(user.ID, props.ProjectName) + if err != nil { + return nil, err + } + projectID = project.ID + } else if space == "prose" { // figure out post ID + if path == "" || path == "/" { + } else { + post, err := dbpool.FindPostWithSlug(path, user.ID, space) + if err != nil { + return nil, err + } + postID = post.ID + } + } + + return &db.AnalyticsVisits{ + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: strings.Join(access.Request.Headers.UserAgent, " "), + Referer: strings.Join(access.Request.Headers.Referer, " "), + ContentType: strings.Join(access.RespHeaders.ContentType, " "), + Status: access.Status, + }, nil +} + +// this feels really stupid because i'm taking containter-drain, +// filtering it, and then sending it to metric-drain. The +// metricDrainSub function listens on the metric-drain and saves it. +// So why not just call the necessary functions to save the visit? +// We want to be able to use pipe as a debugging tool which means we +// can manually sub to `metric-drain` and have a nice clean output to view. +func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) { + info := shared.NewPicoPipeClient() + drain := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "container drain", + "sub container-drain -k", + 100, + -1, + ) + + send := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "from container drain to metric drain", + "pub metric-drain -b=false", + 100, + -1, + ) + + for { + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "http.log.access") { + clean := strings.TrimSpace(line) + visit, err := accessLogToVisit(dbpool, clean) + if err != nil { + logger.Debug("could not convert access log to a visit", "err", err) + continue + } + jso, err := json.Marshal(visit) + if err != nil { + logger.Error("could not marshal json of a visit", "err", err) + continue + } + _, _ = send.Write(jso) + } + } + } +} + +func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) { + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) + if err != nil { + return nil, err + } + + return deserializeCaddyAccessLog(dbpool, &accessLog) +} + func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) { drain := metrics.ReconnectReadMetrics( ctx, visit := db.AnalyticsVisits{} err := json.Unmarshal([]byte(line), &visit) if err != nil { - logger.Error("json unmarshal", "err", err) + logger.Info("could not unmarshal json", "err", err, "line", line) continue } - - user := slog.Any("userId", visit.UserID) - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) if err != nil { if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Info("could not record analytics visit", "reason", err, "visit", visit, user) - continue + logger.Info("could not record analytics visit", "reason", err) } } - logger.Info("inserting visit", "visit", visit, user) + if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") { + continue + } + + logger.Info("inserting visit", "visit", visit) err = dbpool.InsertVisit(&visit) if err != nil { - logger.Error("could not insert visit record", "err", err, "visit", visit, user) + logger.Error("could not insert visit record", "err", err) } } - - if scanner.Err() != nil { - logger.Error("scanner error", "err", scanner.Err()) - } } } // gather metrics in the auth service go metricDrainSub(ctx, db, logger, cfg.Secret) + // convert container logs to access logs + go containerDrainSub(ctx, db, logger) + defer ctx.Done() apiConfig := &shared.ApiConfig{ @@ db/db.go } type AnalyticsVisits struct { - ID string `json:"id"` - UserID string `json:"user_id"` - ProjectID string `json:"project_id"` - PostID string `json:"post_id"` - Namespace string `json:"namespace"` - Host string `json:"host"` - Path string `json:"path"` - IpAddress string `json:"ip_address"` - UserAgent string `json:"user_agent"` - Referer string `json:"referer"` - Status int `json:"status"` + ID string `json:"id"` + UserID string `json:"user_id"` + ProjectID string `json:"project_id"` + PostID string `json:"post_id"` + Namespace string `json:"namespace"` + Host string `json:"host"` + Path string `json:"path"` + IpAddress string `json:"ip_address"` + UserAgent string `json:"user_agent"` + Referer string `json:"referer"` + Status int `json:"status"` + ContentType string `json:"content_type"` } type VisitInterval struct { @@ db/postgres/storage.go func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error { _, err := me.Db.Exec( - `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`, + `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`, visit.UserID, newNullString(visit.ProjectID), newNullString(visit.PostID), visit.UserAgent, visit.Referer, visit.Status, + visit.ContentType, ) return err } @@ imgs/api.go dbpool := shared.GetDB(r) logger := shared.GetLogger(r) username := shared.GetUsernameFromRequest(r) - analytics := shared.GetAnalyticsQueue(r) user, err := dbpool.FindUserForName(username) if err != nil { logger, dbpool, st, - analytics, ) router.ServeAsset(fname, opts, true, anyPerm, w, r) } @@ pastes/api.go Unlisted bool } -type TransparencyPageData struct { - Site shared.SitePageData - Analytics *db.Analytics -} - type Link struct { URL string Text string @@ pgs/ssh.go "github.com/charmbracelet/promwish" "github.com/charmbracelet/ssh" "github.com/charmbracelet/wish" - "github.com/picosh/pico/db" "github.com/picosh/pico/db/postgres" "github.com/picosh/pico/shared" "github.com/picosh/pico/shared/storage" st, ) - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) apiConfig := &shared.ApiConfig{ - Cfg: cfg, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: ch, + Cfg: cfg, + Dbpool: dbpool, + Storage: st, } webTunnel := &tunkit.WebTunnelHandler{ @@ pgs/tunnel.go "pubkey", pubkeyStr, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { log.Error(err.Error()) return http.HandlerFunc(shared.UnauthorizedHandler) logger, apiConfig.Dbpool, apiConfig.Storage, - apiConfig.AnalyticsQueue, ) tunnelRouter := TunnelWebRouter{routes} router := http.NewServeMux() @@ pgs/web.go return } - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) - - routes := NewWebRouter(cfg, logger, dbpool, st, ch) + routes := NewWebRouter(cfg, logger, dbpool, st) portStr := fmt.Sprintf(":%s", cfg.Port) logger.Info( type HasPerm = func(proj *db.Project) bool type WebRouter struct { - Cfg *shared.ConfigSite - Logger *slog.Logger - Dbpool db.DB - Storage storage.StorageServe - AnalyticsQueue chan *db.AnalyticsVisits - RootRouter *http.ServeMux - UserRouter *http.ServeMux + Cfg *shared.ConfigSite + Logger *slog.Logger + Dbpool db.DB + Storage storage.StorageServe + RootRouter *http.ServeMux + UserRouter *http.ServeMux } -func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter { +func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter { router := &WebRouter{ - Cfg: cfg, - Logger: logger, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: analytics, + Cfg: cfg, + Logger: logger, + Dbpool: dbpool, + Storage: st, } router.initRouters() return router if !strings.Contains(hostDomain, appDomain) { subdomain := shared.GetCustomDomain(hostDomain, cfg.Space) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Error( "could not get project from subdomain", "host", r.Host, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Info( "could not determine project from subdomain", ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain) router.ServeHTTP(w, r.WithContext(ctx)) } - -type SubdomainProps struct { - ProjectName string - Username string -} - -func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { - props := &SubdomainProps{} - strs := strings.SplitN(subdomain, "-", 2) - props.Username = strs[0] - if len(strs) == 2 { - props.ProjectName = strs[1] - } else { - props.ProjectName = props.Username - } - return props, nil -} @@ pgs/web_asset_handler.go package pgs import ( - "errors" "fmt" "io" "log/slog" "net/http/httputil" _ "net/http/pprof" - "github.com/picosh/pico/shared" "github.com/picosh/pico/shared/storage" sst "github.com/picosh/pobj/storage" ) "routes", strings.Join(attempts, ", "), "status", http.StatusNotFound, ) - // track 404s - ch := h.AnalyticsQueue - view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID) - if err == nil { - view.ProjectID = h.ProjectID - view.Status = http.StatusNotFound - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } http.Error(w, "404 not found", http.StatusNotFound) return } finContentType := w.Header().Get("content-type") - // only track pages, not individual assets - if finContentType == "text/html" { - // track visit - ch := h.AnalyticsQueue - view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID) - if err == nil { - view.ProjectID = h.ProjectID - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } - } - logger.Info( "serving asset", "asset", assetFilepath, @@ pgs/web_test.go "net/http/httptest" "strings" "testing" - "time" "github.com/picosh/pico/db" "github.com/picosh/pico/db/stub" responseRecorder := httptest.NewRecorder() st, _ := storage.NewStorageMemory(tc.storage) - ch := make(chan *db.AnalyticsVisits, 100) - router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch) + router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st) router.ServeHTTP(responseRecorder, request) if responseRecorder.Code != tc.status { } } -func TestAnalytics(t *testing.T) { - bucketName := shared.GetAssetBucketName(testUserID) - cfg := NewConfigSite() - cfg.Domain = "pgs.test" - expectedPath := "/app" - request := httptest.NewRequest("GET", mkpath(expectedPath), strings.NewReader("")) - responseRecorder := httptest.NewRecorder() - - sto := map[string]map[string]string{ - bucketName: { - "test/app.html": "hello world!", - }, - } - st, _ := storage.NewStorageMemory(sto) - ch := make(chan *db.AnalyticsVisits, 100) - dbpool := NewPgsAnalticsDb(cfg.Logger) - router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch) - - go func() { - for analytics := range ch { - if analytics.Path != expectedPath { - t.Errorf("Want path '%s', got '%s'", expectedPath, analytics.Path) - } - close(ch) - } - }() - - router.ServeHTTP(responseRecorder, request) - - select { - case <-ch: - return - case <-time.After(time.Second * 1): - t.Error("didnt receive analytics event within time limit") - } -} - type ImageStorageMemory struct { *storage.StorageMemory Opts *storage.ImgProcessOpts Ratio: &storage.Ratio{}, }, } - ch := make(chan *db.AnalyticsVisits, 100) - router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch) + router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st) router.ServeHTTP(responseRecorder, request) if responseRecorder.Code != tc.status { @@ prose/api.go import ( "bytes" - "errors" "fmt" "html/template" "net/http" Diff template.HTML } -type TransparencyPageData struct { - Site shared.SitePageData - Analytics *db.Analytics -} - type HeaderTxt struct { Title string Bio string postCollection = append(postCollection, p) } - // track visit - ch := shared.GetAnalyticsQueue(r) - view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID) - if err == nil { - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } - data := BlogPageData{ Site: *cfg.GetSiteData(), PageTitle: headerTxt.Title, username := shared.GetUsernameFromRequest(r) subdomain := shared.GetSubdomain(r) cfg := shared.GetCfg(r) - ch := shared.GetAnalyticsQueue(r) var slug string if !cfg.IsSubdomains() || subdomain == "" { ogImageCard = parsedText.ImageCard } - // track visit - view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID) - if err == nil { - view.PostID = post.ID - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } - unlisted := false if post.Hidden || post.PublishAt.After(time.Now()) { unlisted = true mainRoutes := createMainRoutes(staticRoutes) subdomainRoutes := createSubdomainRoutes(staticRoutes) - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) apiConfig := &shared.ApiConfig{ - Cfg: cfg, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: ch, + Cfg: cfg, + Dbpool: dbpool, + Storage: st, } handler := shared.CreateServe(mainRoutes, subdomainRoutes, apiConfig) router := http.HandlerFunc(handler) @@ shared/api.go "github.com/picosh/utils" ) +type SubdomainProps struct { + ProjectName string + Username string +} + +func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { + props := &SubdomainProps{} + strs := strings.SplitN(subdomain, "-", 2) + props.Username = strs[0] + if len(strs) == 2 { + props.ProjectName = strs[1] + } else { + props.ProjectName = props.Username + } + return props, nil +} + func CorsHeaders(headers http.Header) { headers.Add("Access-Control-Allow-Origin", "*") headers.Add("Vary", "Origin") @@ shared/router.go } type ApiConfig struct { - Cfg *ConfigSite - Dbpool db.DB - Storage storage.StorageServe - AnalyticsQueue chan *db.AnalyticsVisits + Cfg *ConfigSite + Dbpool db.DB + Storage storage.StorageServe } func (hc *ApiConfig) HasPrivilegedAccess(apiToken string) bool { ctx = context.WithValue(ctx, ctxDBKey{}, hc.Dbpool) ctx = context.WithValue(ctx, ctxStorageKey{}, hc.Storage) ctx = context.WithValue(ctx, ctxCfg{}, hc.Cfg) - ctx = context.WithValue(ctx, ctxAnalyticsQueue{}, hc.AnalyticsQueue) return ctx } type ctxStorageKey struct{} type ctxLoggerKey struct{} type ctxCfg struct{} -type ctxAnalyticsQueue struct{} type CtxSubdomainKey struct{} type ctxKey struct{} return "" } -func GetAnalyticsQueue(r *http.Request) chan *db.AnalyticsVisits { - return r.Context().Value(ctxAnalyticsQueue{}).(chan *db.AnalyticsVisits) -} - func GetApiToken(r *http.Request) string { authHeader := r.Header.Get("authorization") if authHeader == "" { @@ sql/migrations/20241125_add_content_type_to_analytics.sql +ALTER TABLE analytics_visits ADD COLUMN content_type varchar(256);
Patchset ps-79
reactor(metric-drain): use caddy access logs
Eric Bower
Makefile
+2
-1
auth/api.go
+162
-12
db/db.go
+12
-11
db/postgres/storage.go
+2
-1
imgs/api.go
+0
-2
pastes/api.go
+0
-5
pgs/ssh.go
+3
-7
pgs/tunnel.go
+1
-2
pgs/web.go
+14
-36
pgs/web_asset_handler.go
+0
-37
pgs/web_test.go
+2
-42
prose/api.go
+3
-43
shared/api.go
+17
-0
shared/router.go
+3
-10
reactor(metric-drain): use caddy access logs
Previously we were sending site usage analytics within our web app code. This worked well for our use case because we could filter, parse, and send the analytics to our pipe `metric-drain` which would then store the analytics into our database. Because we want to enable HTTP caching for pgs we won't always reach our web app code since usage analytics will terminate at our cache layer. Instead, we want to record analytics higher in the request stack. In this case, we want to record site analytics from Caddy access logs. Here's how it works: - `pub` caddy access logs to our pipe `container-drain` - `auth/web` will `sub` to `container-drain`, filter, deserialize, and `pub` to `metric-drain` - `auth/web` will `sub` to `metric-drain` and store the analytics in our database
Makefile
link
+2
-1
+2
-1
1diff --git a/Makefile b/Makefile
2index 1730f2a..923e6c7 100644
3--- a/Makefile
4+++ b/Makefile
5@@ -135,10 +135,11 @@ migrate:
6 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql
7 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
8 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
9+ $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
10 .PHONY: migrate
11
12 latest:
13- $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
14+ $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
15 .PHONY: latest
16
17 psql:
auth/api.go
link
+162
-12
+162
-12
1diff --git a/auth/api.go b/auth/api.go
2index 9c38bdc..8cd165b 100644
3--- a/auth/api.go
4+++ b/auth/api.go
5@@ -14,6 +14,7 @@ import (
6 "log/slog"
7 "net/http"
8 "net/url"
9+ "strings"
10 "time"
11
12 "github.com/gorilla/feeds"
13@@ -21,6 +22,7 @@ import (
14 "github.com/picosh/pico/db/postgres"
15 "github.com/picosh/pico/shared"
16 "github.com/picosh/utils"
17+ "github.com/picosh/utils/pipe"
18 "github.com/picosh/utils/pipe/metrics"
19 )
20
21@@ -578,6 +580,155 @@ func checkoutHandler() http.HandlerFunc {
22 }
23 }
24
25+type AccessLogReq struct {
26+ RemoteIP string `json:"remote_ip"`
27+ RemotePort string `json:"remote_port"`
28+ ClientIP string `json:"client_ip"`
29+ Method string `json:"method"`
30+ Host string `json:"host"`
31+ Uri string `json:"uri"`
32+ Headers struct {
33+ UserAgent []string `json:"User-Agent"`
34+ Referer []string `json:"Referer"`
35+ } `json:"headers"`
36+ Tls struct {
37+ ServerName string `json:"server_name"`
38+ } `json:"tls"`
39+}
40+
41+type RespHeaders struct {
42+ ContentType []string `json:"Content-Type"`
43+}
44+
45+type CaddyAccessLog struct {
46+ Request AccessLogReq `json:"request"`
47+ Status int `json:"status"`
48+ RespHeaders RespHeaders `json:"resp_headers"`
49+}
50+
51+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
52+ spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
53+ space := spaceRaw[0]
54+ host := access.Request.Host
55+ path := access.Request.Uri
56+ subdomain := ""
57+
58+ // grab subdomain based on host
59+ if strings.HasSuffix(host, "tuns.sh") {
60+ subdomain = strings.TrimSuffix(host, ".tuns.sh")
61+ } else if strings.HasSuffix(host, "pgs.sh") {
62+ subdomain = strings.TrimSuffix(host, ".pgs.sh")
63+ } else if strings.HasSuffix(host, "prose.sh") {
64+ subdomain = strings.TrimSuffix(host, ".prose.sh")
65+ } else {
66+ subdomain = shared.GetCustomDomain(host, space)
67+ }
68+
69+ // get user and namespace details from subdomain
70+ props, err := shared.GetProjectFromSubdomain(subdomain)
71+ if err != nil {
72+ return nil, err
73+ }
74+ // get user ID
75+ user, err := dbpool.FindUserForName(props.Username)
76+ if err != nil {
77+ return nil, err
78+ }
79+
80+ projectID := ""
81+ postID := ""
82+ if space == "pgs" { // figure out project ID
83+ project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
84+ if err != nil {
85+ return nil, err
86+ }
87+ projectID = project.ID
88+ } else if space == "prose" { // figure out post ID
89+ if path == "" || path == "/" {
90+ } else {
91+ post, err := dbpool.FindPostWithSlug(path, user.ID, space)
92+ if err != nil {
93+ return nil, err
94+ }
95+ postID = post.ID
96+ }
97+ }
98+
99+ return &db.AnalyticsVisits{
100+ UserID: user.ID,
101+ ProjectID: projectID,
102+ PostID: postID,
103+ Namespace: space,
104+ Host: host,
105+ Path: path,
106+ IpAddress: access.Request.ClientIP,
107+ UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
108+ Referer: strings.Join(access.Request.Headers.Referer, " "),
109+ ContentType: strings.Join(access.RespHeaders.ContentType, " "),
110+ Status: access.Status,
111+ }, nil
112+}
113+
114+// this feels really stupid because i'm taking containter-drain,
115+// filtering it, and then sending it to metric-drain. The
116+// metricDrainSub function listens on the metric-drain and saves it.
117+// So why not just call the necessary functions to save the visit?
118+// We want to be able to use pipe as a debugging tool which means we
119+// can manually sub to `metric-drain` and have a nice clean output to view.
120+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
121+ info := shared.NewPicoPipeClient()
122+ drain := pipe.NewReconnectReadWriteCloser(
123+ ctx,
124+ logger,
125+ info,
126+ "container drain",
127+ "sub container-drain -k",
128+ 100,
129+ -1,
130+ )
131+
132+ send := pipe.NewReconnectReadWriteCloser(
133+ ctx,
134+ logger,
135+ info,
136+ "from container drain to metric drain",
137+ "pub metric-drain -b=false",
138+ 100,
139+ -1,
140+ )
141+
142+ for {
143+ scanner := bufio.NewScanner(drain)
144+ for scanner.Scan() {
145+ line := scanner.Text()
146+ if strings.Contains(line, "http.log.access") {
147+ clean := strings.TrimSpace(line)
148+ visit, err := accessLogToVisit(dbpool, clean)
149+ if err != nil {
150+ logger.Debug("could not convert access log to a visit", "err", err)
151+ continue
152+ }
153+ jso, err := json.Marshal(visit)
154+ if err != nil {
155+ logger.Error("could not marshal json of a visit", "err", err)
156+ continue
157+ }
158+ _, _ = send.Write(jso)
159+ }
160+ }
161+ }
162+}
163+
164+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
165+ accessLog := CaddyAccessLog{}
166+ err := json.Unmarshal([]byte(line), &accessLog)
167+ if err != nil {
168+ return nil, err
169+ }
170+
171+ return deserializeCaddyAccessLog(dbpool, &accessLog)
172+}
173+
174 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
175 drain := metrics.ReconnectReadMetrics(
176 ctx,
177@@ -594,30 +745,26 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
178 visit := db.AnalyticsVisits{}
179 err := json.Unmarshal([]byte(line), &visit)
180 if err != nil {
181- logger.Error("json unmarshal", "err", err)
182+ logger.Info("could not unmarshal json", "err", err, "line", line)
183 continue
184 }
185-
186- user := slog.Any("userId", visit.UserID)
187-
188 err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
189 if err != nil {
190 if !errors.Is(err, shared.ErrAnalyticsDisabled) {
191- logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
192- continue
193+ logger.Info("could not record analytics visit", "reason", err)
194 }
195 }
196
197- logger.Info("inserting visit", "visit", visit, user)
198+ if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
199+ continue
200+ }
201+
202+ logger.Info("inserting visit", "visit", visit)
203 err = dbpool.InsertVisit(&visit)
204 if err != nil {
205- logger.Error("could not insert visit record", "err", err, "visit", visit, user)
206+ logger.Error("could not insert visit record", "err", err)
207 }
208 }
209-
210- if scanner.Err() != nil {
211- logger.Error("scanner error", "err", scanner.Err())
212- }
213 }
214 }
215
216@@ -689,6 +836,9 @@ func StartApiServer() {
217
218 // gather metrics in the auth service
219 go metricDrainSub(ctx, db, logger, cfg.Secret)
220+ // convert container logs to access logs
221+ go containerDrainSub(ctx, db, logger)
222+
223 defer ctx.Done()
224
225 apiConfig := &shared.ApiConfig{
db/db.go
link
+12
-11
+12
-11
1diff --git a/db/db.go b/db/db.go
2index 3e684ef..a3ac860 100644
3--- a/db/db.go
4+++ b/db/db.go
5@@ -161,17 +161,18 @@ type PostAnalytics struct {
6 }
7
8 type AnalyticsVisits struct {
9- ID string `json:"id"`
10- UserID string `json:"user_id"`
11- ProjectID string `json:"project_id"`
12- PostID string `json:"post_id"`
13- Namespace string `json:"namespace"`
14- Host string `json:"host"`
15- Path string `json:"path"`
16- IpAddress string `json:"ip_address"`
17- UserAgent string `json:"user_agent"`
18- Referer string `json:"referer"`
19- Status int `json:"status"`
20+ ID string `json:"id"`
21+ UserID string `json:"user_id"`
22+ ProjectID string `json:"project_id"`
23+ PostID string `json:"post_id"`
24+ Namespace string `json:"namespace"`
25+ Host string `json:"host"`
26+ Path string `json:"path"`
27+ IpAddress string `json:"ip_address"`
28+ UserAgent string `json:"user_agent"`
29+ Referer string `json:"referer"`
30+ Status int `json:"status"`
31+ ContentType string `json:"content_type"`
32 }
33
34 type VisitInterval struct {
db/postgres/storage.go
link
+2
-1
+2
-1
1diff --git a/db/postgres/storage.go b/db/postgres/storage.go
2index 66b88e2..4a57e5a 100644
3--- a/db/postgres/storage.go
4+++ b/db/postgres/storage.go
5@@ -986,7 +986,7 @@ func newNullString(s string) sql.NullString {
6
7 func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
8 _, err := me.Db.Exec(
9- `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
10+ `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
11 visit.UserID,
12 newNullString(visit.ProjectID),
13 newNullString(visit.PostID),
14@@ -997,6 +997,7 @@ func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
15 visit.UserAgent,
16 visit.Referer,
17 visit.Status,
18+ visit.ContentType,
19 )
20 return err
21 }
imgs/api.go
link
+0
-2
+0
-2
1diff --git a/imgs/api.go b/imgs/api.go
2index 7e9c034..99bd917 100644
3--- a/imgs/api.go
4+++ b/imgs/api.go
5@@ -177,7 +177,6 @@ func ImgRequest(w http.ResponseWriter, r *http.Request) {
6 dbpool := shared.GetDB(r)
7 logger := shared.GetLogger(r)
8 username := shared.GetUsernameFromRequest(r)
9- analytics := shared.GetAnalyticsQueue(r)
10
11 user, err := dbpool.FindUserForName(username)
12 if err != nil {
13@@ -241,7 +240,6 @@ func ImgRequest(w http.ResponseWriter, r *http.Request) {
14 logger,
15 dbpool,
16 st,
17- analytics,
18 )
19 router.ServeAsset(fname, opts, true, anyPerm, w, r)
20 }
pastes/api.go
link
+0
-5
+0
-5
1diff --git a/pastes/api.go b/pastes/api.go
2index 86a230c..8e39b63 100644
3--- a/pastes/api.go
4+++ b/pastes/api.go
5@@ -59,11 +59,6 @@ type PostPageData struct {
6 Unlisted bool
7 }
8
9-type TransparencyPageData struct {
10- Site shared.SitePageData
11- Analytics *db.Analytics
12-}
13-
14 type Link struct {
15 URL string
16 Text string
pgs/ssh.go
link
+3
-7
+3
-7
1diff --git a/pgs/ssh.go b/pgs/ssh.go
2index 0e25d50..0f11014 100644
3--- a/pgs/ssh.go
4+++ b/pgs/ssh.go
5@@ -11,7 +11,6 @@ import (
6 "github.com/charmbracelet/promwish"
7 "github.com/charmbracelet/ssh"
8 "github.com/charmbracelet/wish"
9- "github.com/picosh/pico/db"
10 "github.com/picosh/pico/db/postgres"
11 "github.com/picosh/pico/shared"
12 "github.com/picosh/pico/shared/storage"
13@@ -81,13 +80,10 @@ func StartSshServer() {
14 st,
15 )
16
17- ch := make(chan *db.AnalyticsVisits, 100)
18- go shared.AnalyticsCollect(ch, dbpool, logger)
19 apiConfig := &shared.ApiConfig{
20- Cfg: cfg,
21- Dbpool: dbpool,
22- Storage: st,
23- AnalyticsQueue: ch,
24+ Cfg: cfg,
25+ Dbpool: dbpool,
26+ Storage: st,
27 }
28
29 webTunnel := &tunkit.WebTunnelHandler{
pgs/tunnel.go
link
+1
-2
+1
-2
1diff --git a/pgs/tunnel.go b/pgs/tunnel.go
2index b635c8e..34a8bd0 100644
3--- a/pgs/tunnel.go
4+++ b/pgs/tunnel.go
5@@ -51,7 +51,7 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
6 "pubkey", pubkeyStr,
7 )
8
9- props, err := getProjectFromSubdomain(subdomain)
10+ props, err := shared.GetProjectFromSubdomain(subdomain)
11 if err != nil {
12 log.Error(err.Error())
13 return http.HandlerFunc(shared.UnauthorizedHandler)
14@@ -121,7 +121,6 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
15 logger,
16 apiConfig.Dbpool,
17 apiConfig.Storage,
18- apiConfig.AnalyticsQueue,
19 )
20 tunnelRouter := TunnelWebRouter{routes}
21 router := http.NewServeMux()
pgs/web.go
link
+14
-36
+14
-36
1diff --git a/pgs/web.go b/pgs/web.go
2index 685f6ef..0013ac0 100644
3--- a/pgs/web.go
4+++ b/pgs/web.go
5@@ -40,10 +40,7 @@ func StartApiServer() {
6 return
7 }
8
9- ch := make(chan *db.AnalyticsVisits, 100)
10- go shared.AnalyticsCollect(ch, dbpool, logger)
11-
12- routes := NewWebRouter(cfg, logger, dbpool, st, ch)
13+ routes := NewWebRouter(cfg, logger, dbpool, st)
14
15 portStr := fmt.Sprintf(":%s", cfg.Port)
16 logger.Info(
17@@ -61,22 +58,20 @@ func StartApiServer() {
18 type HasPerm = func(proj *db.Project) bool
19
20 type WebRouter struct {
21- Cfg *shared.ConfigSite
22- Logger *slog.Logger
23- Dbpool db.DB
24- Storage storage.StorageServe
25- AnalyticsQueue chan *db.AnalyticsVisits
26- RootRouter *http.ServeMux
27- UserRouter *http.ServeMux
28+ Cfg *shared.ConfigSite
29+ Logger *slog.Logger
30+ Dbpool db.DB
31+ Storage storage.StorageServe
32+ RootRouter *http.ServeMux
33+ UserRouter *http.ServeMux
34 }
35
36-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
37+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
38 router := &WebRouter{
39- Cfg: cfg,
40- Logger: logger,
41- Dbpool: dbpool,
42- Storage: st,
43- AnalyticsQueue: analytics,
44+ Cfg: cfg,
45+ Logger: logger,
46+ Dbpool: dbpool,
47+ Storage: st,
48 }
49 router.initRouters()
50 return router
51@@ -177,7 +172,7 @@ func (web *WebRouter) checkHandler(w http.ResponseWriter, r *http.Request) {
52
53 if !strings.Contains(hostDomain, appDomain) {
54 subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
55- props, err := getProjectFromSubdomain(subdomain)
56+ props, err := shared.GetProjectFromSubdomain(subdomain)
57 if err != nil {
58 logger.Error(
59 "could not get project from subdomain",
60@@ -333,7 +328,7 @@ func (web *WebRouter) ServeAsset(fname string, opts *storage.ImgProcessOpts, fro
61 "host", r.Host,
62 )
63
64- props, err := getProjectFromSubdomain(subdomain)
65+ props, err := shared.GetProjectFromSubdomain(subdomain)
66 if err != nil {
67 logger.Info(
68 "could not determine project from subdomain",
69@@ -450,20 +445,3 @@ func (web *WebRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
70 ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
71 router.ServeHTTP(w, r.WithContext(ctx))
72 }
73-
74-type SubdomainProps struct {
75- ProjectName string
76- Username string
77-}
78-
79-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
80- props := &SubdomainProps{}
81- strs := strings.SplitN(subdomain, "-", 2)
82- props.Username = strs[0]
83- if len(strs) == 2 {
84- props.ProjectName = strs[1]
85- } else {
86- props.ProjectName = props.Username
87- }
88- return props, nil
89-}
pgs/web_asset_handler.go
link
+0
-37
+0
-37
1diff --git a/pgs/web_asset_handler.go b/pgs/web_asset_handler.go
2index dd41006..fac47df 100644
3--- a/pgs/web_asset_handler.go
4+++ b/pgs/web_asset_handler.go
5@@ -1,7 +1,6 @@
6 package pgs
7
8 import (
9- "errors"
10 "fmt"
11 "io"
12 "log/slog"
13@@ -15,7 +14,6 @@ import (
14 "net/http/httputil"
15 _ "net/http/pprof"
16
17- "github.com/picosh/pico/shared"
18 "github.com/picosh/pico/shared/storage"
19 sst "github.com/picosh/pobj/storage"
20 )
21@@ -155,22 +153,6 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
22 "routes", strings.Join(attempts, ", "),
23 "status", http.StatusNotFound,
24 )
25- // track 404s
26- ch := h.AnalyticsQueue
27- view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
28- if err == nil {
29- view.ProjectID = h.ProjectID
30- view.Status = http.StatusNotFound
31- select {
32- case ch <- view:
33- default:
34- logger.Error("could not send analytics view to channel", "view", view)
35- }
36- } else {
37- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
38- logger.Error("could not record analytics view", "err", err, "view", view)
39- }
40- }
41 http.Error(w, "404 not found", http.StatusNotFound)
42 return
43 }
44@@ -236,25 +218,6 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
45
46 finContentType := w.Header().Get("content-type")
47
48- // only track pages, not individual assets
49- if finContentType == "text/html" {
50- // track visit
51- ch := h.AnalyticsQueue
52- view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
53- if err == nil {
54- view.ProjectID = h.ProjectID
55- select {
56- case ch <- view:
57- default:
58- logger.Error("could not send analytics view to channel", "view", view)
59- }
60- } else {
61- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
62- logger.Error("could not record analytics view", "err", err, "view", view)
63- }
64- }
65- }
66-
67 logger.Info(
68 "serving asset",
69 "asset", assetFilepath,
pgs/web_test.go
link
+2
-42
+2
-42
1diff --git a/pgs/web_test.go b/pgs/web_test.go
2index 3c9c31d..8694d6f 100644
3--- a/pgs/web_test.go
4+++ b/pgs/web_test.go
5@@ -8,7 +8,6 @@ import (
6 "net/http/httptest"
7 "strings"
8 "testing"
9- "time"
10
11 "github.com/picosh/pico/db"
12 "github.com/picosh/pico/db/stub"
13@@ -219,8 +218,7 @@ func TestApiBasic(t *testing.T) {
14 responseRecorder := httptest.NewRecorder()
15
16 st, _ := storage.NewStorageMemory(tc.storage)
17- ch := make(chan *db.AnalyticsVisits, 100)
18- router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
19+ router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
20 router.ServeHTTP(responseRecorder, request)
21
22 if responseRecorder.Code != tc.status {
23@@ -240,43 +238,6 @@ func TestApiBasic(t *testing.T) {
24 }
25 }
26
27-func TestAnalytics(t *testing.T) {
28- bucketName := shared.GetAssetBucketName(testUserID)
29- cfg := NewConfigSite()
30- cfg.Domain = "pgs.test"
31- expectedPath := "/app"
32- request := httptest.NewRequest("GET", mkpath(expectedPath), strings.NewReader(""))
33- responseRecorder := httptest.NewRecorder()
34-
35- sto := map[string]map[string]string{
36- bucketName: {
37- "test/app.html": "hello world!",
38- },
39- }
40- st, _ := storage.NewStorageMemory(sto)
41- ch := make(chan *db.AnalyticsVisits, 100)
42- dbpool := NewPgsAnalticsDb(cfg.Logger)
43- router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch)
44-
45- go func() {
46- for analytics := range ch {
47- if analytics.Path != expectedPath {
48- t.Errorf("Want path '%s', got '%s'", expectedPath, analytics.Path)
49- }
50- close(ch)
51- }
52- }()
53-
54- router.ServeHTTP(responseRecorder, request)
55-
56- select {
57- case <-ch:
58- return
59- case <-time.After(time.Second * 1):
60- t.Error("didnt receive analytics event within time limit")
61- }
62-}
63-
64 type ImageStorageMemory struct {
65 *storage.StorageMemory
66 Opts *storage.ImgProcessOpts
67@@ -337,8 +298,7 @@ func TestImageManipulation(t *testing.T) {
68 Ratio: &storage.Ratio{},
69 },
70 }
71- ch := make(chan *db.AnalyticsVisits, 100)
72- router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
73+ router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
74 router.ServeHTTP(responseRecorder, request)
75
76 if responseRecorder.Code != tc.status {
prose/api.go
link
+3
-43
+3
-43
1diff --git a/prose/api.go b/prose/api.go
2index 5368f0a..0fd4c5a 100644
3--- a/prose/api.go
4+++ b/prose/api.go
5@@ -2,7 +2,6 @@ package prose
6
7 import (
8 "bytes"
9- "errors"
10 "fmt"
11 "html/template"
12 "net/http"
13@@ -89,11 +88,6 @@ type PostPageData struct {
14 Diff template.HTML
15 }
16
17-type TransparencyPageData struct {
18- Site shared.SitePageData
19- Analytics *db.Analytics
20-}
21-
22 type HeaderTxt struct {
23 Title string
24 Bio string
25@@ -270,21 +264,6 @@ func blogHandler(w http.ResponseWriter, r *http.Request) {
26 postCollection = append(postCollection, p)
27 }
28
29- // track visit
30- ch := shared.GetAnalyticsQueue(r)
31- view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
32- if err == nil {
33- select {
34- case ch <- view:
35- default:
36- logger.Error("could not send analytics view to channel", "view", view)
37- }
38- } else {
39- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
40- logger.Error("could not record analytics view", "err", err, "view", view)
41- }
42- }
43-
44 data := BlogPageData{
45 Site: *cfg.GetSiteData(),
46 PageTitle: headerTxt.Title,
47@@ -350,7 +329,6 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
48 username := shared.GetUsernameFromRequest(r)
49 subdomain := shared.GetSubdomain(r)
50 cfg := shared.GetCfg(r)
51- ch := shared.GetAnalyticsQueue(r)
52
53 var slug string
54 if !cfg.IsSubdomains() || subdomain == "" {
55@@ -429,21 +407,6 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
56 ogImageCard = parsedText.ImageCard
57 }
58
59- // track visit
60- view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
61- if err == nil {
62- view.PostID = post.ID
63- select {
64- case ch <- view:
65- default:
66- logger.Error("could not send analytics view to channel", "view", view)
67- }
68- } else {
69- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
70- logger.Error("could not record analytics view", "err", err, "view", view)
71- }
72- }
73-
74 unlisted := false
75 if post.Hidden || post.PublishAt.After(time.Now()) {
76 unlisted = true
77@@ -953,13 +916,10 @@ func StartApiServer() {
78 mainRoutes := createMainRoutes(staticRoutes)
79 subdomainRoutes := createSubdomainRoutes(staticRoutes)
80
81- ch := make(chan *db.AnalyticsVisits, 100)
82- go shared.AnalyticsCollect(ch, dbpool, logger)
83 apiConfig := &shared.ApiConfig{
84- Cfg: cfg,
85- Dbpool: dbpool,
86- Storage: st,
87- AnalyticsQueue: ch,
88+ Cfg: cfg,
89+ Dbpool: dbpool,
90+ Storage: st,
91 }
92 handler := shared.CreateServe(mainRoutes, subdomainRoutes, apiConfig)
93 router := http.HandlerFunc(handler)
sql/migrations/20241125_add_content_type_to_analytics.sql
link
+1
-0
+1
-0
1diff --git a/sql/migrations/20241125_add_content_type_to_analytics.sql b/sql/migrations/20241125_add_content_type_to_analytics.sql
2new file mode 100644
3index 0000000..d9b2501
4--- /dev/null
5+++ b/sql/migrations/20241125_add_content_type_to_analytics.sql
6@@ -0,0 +1,1 @@
7+ALTER TABLE analytics_visits ADD COLUMN content_type varchar(256);