Logs
Patchsets
Range Diff ↕
1: 77aaa29 ! 1: 8d56535 reactor(metric-drain): use caddy json format
@@ auth/api.go "log/slog" "net/http" "net/url" + "strings" "time" "github.com/gorilla/feeds" } } +type AccessLogReq struct { + RemoteIP string `json:"remote_ip"` + RemotePort string `json:"remote_port"` + ClientIP string `json:"client_ip"` + Method string `json:"method"` + Host string `json:"host"` + Uri string `json:"uri"` + Headers struct { + UserAgent string `json:"User-Agent"` + Referer string `json:"Referer"` + } `json:"headers"` + Tls struct { + ServerName string `json:"server_name"` + } `json:"tls"` +} + +type CaddyAccessLog struct { + Request AccessLogReq `json:"request"` + Status int `json:"status"` +} + +func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) { + spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2) + space := spaceRaw[0] + host := access.Request.Host + path := access.Request.Uri + subdomain := "" + + // grab subdomain based on host + if strings.HasSuffix(host, "tuns.sh") { + subdomain = strings.TrimSuffix(host, ".tuns.sh") + } else if strings.HasSuffix(host, "pgs.sh") { + subdomain = strings.TrimSuffix(host, ".pgs.sh") + } else if strings.HasSuffix(host, "prose.sh") { + subdomain = strings.TrimSuffix(host, ".prose.sh") + } else { + subdomain = shared.GetCustomDomain(host, space) + } + + // get user and namespace details from subdomain + props, err := shared.GetProjectFromSubdomain(subdomain) + if err != nil { + return nil, err + } + // get user ID + user, err := dbpool.FindUserForName(props.Username) + if err != nil { + return nil, err + } + + projectID := "" + postID := "" + if space == "pgs" { // figure out project ID + project, err := dbpool.FindProjectByName(user.ID, props.ProjectName) + if err != nil { + return nil, err + } + projectID = project.ID + } else if space == "prose" { // figure out post ID + if path == "" || path == "/" { + } else { + post, err := dbpool.FindPostWithSlug(path, user.ID, space) + if err != nil { + return nil, err + } + postID = post.ID + } + } + + return &db.AnalyticsVisits{ + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: access.Request.Headers.UserAgent, + Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log + Status: access.Status, + }, nil +} + func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) { conn := shared.NewPicoPipeClient() stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn) scanner := bufio.NewScanner(stdoutPipe) for scanner.Scan() { line := scanner.Text() - visit := db.AnalyticsVisits{} - err := json.Unmarshal([]byte(line), &visit) drain := metrics.ReconnectReadMetrics( ctx, -1, ) - for { - scanner := bufio.NewScanner(drain) - for scanner.Scan() { - line := scanner.Text() - visit := db.AnalyticsVisits{} - err := json.Unmarshal([]byte(line), &visit) - if err != nil { - logger.Error("json unmarshal", "err", err) - continue - } - - user := slog.Any("userId", visit.UserID) - - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) - if err != nil { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Info("could not record analytics visit", "reason", err, "visit", visit, user) - continue - } - } + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) if err != nil { logger.Error("json unmarshal", "err", err) continue } + if err != nil { + logger.Error("json unmarshal", "err", err) + continue + } - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) - logger.Info("inserting visit", "visit", visit, user) - err = dbpool.InsertVisit(&visit) - if err != nil { - logger.Error("could not insert visit record", "err", err, "visit", visit, user) + visit, err := deserializeCaddyAccessLog(dbpool, &accessLog) + if err != nil { + logger.Error("cannot deserialize access log", "err", err) + continue + } + err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret) if err != nil { if !errors.Is(err, shared.ErrAnalyticsDisabled) { logger.Info("could not record analytics visit", "reason", err) + if err != nil { + if !errors.Is(err, shared.ErrAnalyticsDisabled) { + logger.Info("could not record analytics visit", "reason", err) } } logger.Info("inserting visit", "visit", visit) - err = dbpool.InsertVisit(&visit) - if scanner.Err() != nil { - logger.Error("scanner error", "err", scanner.Err()) + logger.Info("inserting visit", "visit", visit) + err = dbpool.InsertVisit(visit) if err != nil { logger.Error("could not insert visit record", "err", err) + if err != nil { + logger.Error("could not insert visit record", "err", err) } } } @@ caddy.json +{ + "level": "info", + "ts": 1731644477.313701, + "logger": "http.log.access", + "msg": "handled request", + "request": { + "remote_ip": "127.0.0.1", + "remote_port": "40400", + "client_ip": "127.0.0.1", + "proto": "HTTP/2.0", + "method": "GET", + "host": "pgs.sh", + "uri": "/", + "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] }, + "tls": { + "resumed": false, + "version": 772, + "cipher_suite": 4865, + "proto": "h2", + "server_name": "pgs.sh" + } + }, + "bytes_read": 0, + "user_id": "", + "duration": 0.001207084, + "size": 3718, + "status": 200, + "resp_headers": { + "Referrer-Policy": ["no-referrer-when-downgrade"], + "Strict-Transport-Security": ["max-age=31536000;"], + "X-Content-Type-Options": ["nosniff"], + "X-Frame-Options": ["DENY"], + "Server": ["Caddy"], + "Alt-Svc": ["h3=\":443\"; ma=2592000"], + "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"], + "Content-Type": ["text/html; charset=utf-8"], + "X-Xss-Protection": ["1; mode=block"], + "Permissions-Policy": ["interest-cohort=()"] + } +} @@ pgs/tunnel.go "pubkey", pubkeyStr, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { log.Error(err.Error()) return http.HandlerFunc(shared.UnauthorizedHandler) @@ pgs/web.go if !strings.Contains(hostDomain, appDomain) { subdomain := shared.GetCustomDomain(hostDomain, cfg.Space) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Error( "could not get project from subdomain", "host", r.Host, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Info( "could not determine project from subdomain", ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain) router.ServeHTTP(w, r.WithContext(ctx)) } - -type SubdomainProps struct { - ProjectName string - Username string -} - -func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { - props := &SubdomainProps{} - strs := strings.SplitN(subdomain, "-", 2) - props.Username = strs[0] - if len(strs) == 2 { - props.ProjectName = strs[1] - } else { - props.ProjectName = props.Username - } - return props, nil -} @@ shared/api.go "github.com/picosh/utils" ) +type SubdomainProps struct { + ProjectName string + Username string +} + +func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { + props := &SubdomainProps{} + strs := strings.SplitN(subdomain, "-", 2) + props.Username = strs[0] + if len(strs) == 2 { + props.ProjectName = strs[1] + } else { + props.ProjectName = props.Username + } + return props, nil +} + func CorsHeaders(headers http.Header) { headers.Add("Access-Control-Allow-Origin", "*") headers.Add("Vary", "Origin")
-: ------- > 2: a336041 wip
-: ------- > 3: 7ae45b3 chore: wrap
-: ------- > 4: bfa5c4f done
Range Diff ↕
2: a336041 < -: ------- wip
3: 7ae45b3 < -: ------- chore: wrap
4: bfa5c4f < -: ------- done
1: 8d56535 ! 1: c7eeb12 reactor(metric-drain): use caddy access logs
@@ auth/api.go "log/slog" "net/http" "net/url" + "strings" "time" "github.com/gorilla/feeds" "github.com/picosh/pico/db/postgres" "github.com/picosh/pico/shared" "github.com/picosh/utils" + "github.com/picosh/utils/pipe" "github.com/picosh/utils/pipe/metrics" ) } } +type AccessLogReq struct { + RemoteIP string `json:"remote_ip"` + RemotePort string `json:"remote_port"` + ClientIP string `json:"client_ip"` + Method string `json:"method"` + Host string `json:"host"` + Uri string `json:"uri"` + Headers struct { + UserAgent string `json:"User-Agent"` + Referer string `json:"Referer"` + UserAgent []string `json:"User-Agent"` + Referer []string `json:"Referer"` + } `json:"headers"` + Tls struct { + ServerName string `json:"server_name"` + } `json:"tls"` +} + +type RespHeaders struct { + ContentType []string `json:"Content-Type"` +} + +type CaddyAccessLog struct { + Request AccessLogReq `json:"request"` + Status int `json:"status"` + Request AccessLogReq `json:"request"` + Status int `json:"status"` + RespHeaders RespHeaders `json:"resp_headers"` +} + +func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) { + spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2) + space := spaceRaw[0] + host := access.Request.Host + path := access.Request.Uri + subdomain := "" + + // grab subdomain based on host + if strings.HasSuffix(host, "tuns.sh") { + subdomain = strings.TrimSuffix(host, ".tuns.sh") + } else if strings.HasSuffix(host, "pgs.sh") { + subdomain = strings.TrimSuffix(host, ".pgs.sh") + } else if strings.HasSuffix(host, "prose.sh") { + subdomain = strings.TrimSuffix(host, ".prose.sh") + } else { + subdomain = shared.GetCustomDomain(host, space) + } + + // get user and namespace details from subdomain + props, err := shared.GetProjectFromSubdomain(subdomain) + if err != nil { + return nil, err + } + // get user ID + user, err := dbpool.FindUserForName(props.Username) + if err != nil { + return nil, err + } + + projectID := "" + postID := "" + if space == "pgs" { // figure out project ID + project, err := dbpool.FindProjectByName(user.ID, props.ProjectName) + if err != nil { + return nil, err + } + projectID = project.ID + } else if space == "prose" { // figure out post ID + if path == "" || path == "/" { + } else { + post, err := dbpool.FindPostWithSlug(path, user.ID, space) + if err != nil { + return nil, err + } + postID = post.ID + } + } + + return &db.AnalyticsVisits{ + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: access.Request.Headers.UserAgent, + Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log + Status: access.Status, + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: strings.Join(access.Request.Headers.UserAgent, " "), + Referer: strings.Join(access.Request.Headers.Referer, " "), + ContentType: strings.Join(access.RespHeaders.ContentType, " "), + Status: access.Status, + }, nil +} + +// this feels really stupid because i'm taking containter-drain, +// filtering it, and then sending it to metric-drain. The +// metricDrainSub function listens on the metric-drain and saves it. +// So why not just call the necessary functions to save the visit? +// We want to be able to use pipe as a debugging tool which means we +// can manually sub to `metric-drain` and have a nice clean output to view. +func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) { + info := shared.NewPicoPipeClient() + drain := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "container drain", + "sub container-drain -k", + 100, + -1, + ) + + send := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "from container drain to metric drain", + "pub metric-drain -b=false", + 100, + -1, + ) + + for { + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "http.log.access") { + clean := strings.TrimSpace(line) + visit, err := accessLogToVisit(dbpool, clean) + if err != nil { + logger.Debug("could not convert access log to a visit", "err", err) + continue + } + jso, err := json.Marshal(visit) + if err != nil { + logger.Error("could not marshal json of a visit", "err", err) + continue + } + _, _ = send.Write(jso) + } + } + } +} + +func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) { + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) + if err != nil { + return nil, err + } + + return deserializeCaddyAccessLog(dbpool, &accessLog) +} + func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) { drain := metrics.ReconnectReadMetrics( ctx, -1, ) - for { - scanner := bufio.NewScanner(drain) - for scanner.Scan() { - line := scanner.Text() - visit := db.AnalyticsVisits{} - err := json.Unmarshal([]byte(line), &visit) - if err != nil { visit := db.AnalyticsVisits{} err := json.Unmarshal([]byte(line), &visit) if err != nil { - logger.Error("json unmarshal", "err", err) - continue - } + logger.Info("could not unmarshal json", "err", err, "line", line) continue } - - user := slog.Any("userId", visit.UserID) - - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) - if err != nil { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) if err != nil { if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Info("could not record analytics visit", "reason", err, "visit", visit, user) - continue - } - } + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) + if err != nil { + logger.Error("json unmarshal", "err", err) + continue + } + logger.Info("could not record analytics visit", "reason", err) } } - logger.Info("inserting visit", "visit", visit, user) - err = dbpool.InsertVisit(&visit) - if err != nil { + if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") { + continue + } + + logger.Info("inserting visit", "visit", visit) err = dbpool.InsertVisit(&visit) if err != nil { - logger.Error("could not insert visit record", "err", err, "visit", visit, user) + visit, err := deserializeCaddyAccessLog(dbpool, &accessLog) + if err != nil { + logger.Error("cannot deserialize access log", "err", err) + continue + } + err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret) + if err != nil { + if !errors.Is(err, shared.ErrAnalyticsDisabled) { + logger.Info("could not record analytics visit", "reason", err) + logger.Error("could not insert visit record", "err", err) } } - - if scanner.Err() != nil { - logger.Error("scanner error", "err", scanner.Err()) + logger.Info("inserting visit", "visit", visit) + err = dbpool.InsertVisit(visit) + if err != nil { + logger.Error("could not insert visit record", "err", err) } - } } } // gather metrics in the auth service go metricDrainSub(ctx, db, logger, cfg.Secret) + // convert container logs to access logs + go containerDrainSub(ctx, db, logger) + defer ctx.Done() apiConfig := &shared.ApiConfig{ @@ caddy.json +{ + "level": "info", + "ts": 1731644477.313701, + "logger": "http.log.access", + "msg": "handled request", + "request": { + "remote_ip": "127.0.0.1", + "remote_port": "40400", + "client_ip": "127.0.0.1", + "proto": "HTTP/2.0", + "method": "GET", + "host": "pgs.sh", + "uri": "/", + "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] }, + "tls": { + "resumed": false, + "version": 772, + "cipher_suite": 4865, + "proto": "h2", + "server_name": "pgs.sh" + } + }, + "bytes_read": 0, + "user_id": "", + "duration": 0.001207084, + "size": 3718, + "status": 200, + "resp_headers": { + "Referrer-Policy": ["no-referrer-when-downgrade"], + "Strict-Transport-Security": ["max-age=31536000;"], + "X-Content-Type-Options": ["nosniff"], + "X-Frame-Options": ["DENY"], + "Server": ["Caddy"], + "Alt-Svc": ["h3=\":443\"; ma=2592000"], + "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"], + "Content-Type": ["text/html; charset=utf-8"], + "X-Xss-Protection": ["1; mode=block"], + "Permissions-Policy": ["interest-cohort=()"] + } +} @@ pgs/tunnel.go "pubkey", pubkeyStr, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { log.Error(err.Error()) return http.HandlerFunc(shared.UnauthorizedHandler) logger, apiConfig.Dbpool, apiConfig.Storage, - apiConfig.AnalyticsQueue, ) tunnelRouter := TunnelWebRouter{routes} router := http.NewServeMux() @@ pgs/web.go return } - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) - - routes := NewWebRouter(cfg, logger, dbpool, st, ch) + routes := NewWebRouter(cfg, logger, dbpool, st) portStr := fmt.Sprintf(":%s", cfg.Port) logger.Info( type HasPerm = func(proj *db.Project) bool type WebRouter struct { - Cfg *shared.ConfigSite - Logger *slog.Logger - Dbpool db.DB - Storage storage.StorageServe - AnalyticsQueue chan *db.AnalyticsVisits - RootRouter *http.ServeMux - UserRouter *http.ServeMux + Cfg *shared.ConfigSite + Logger *slog.Logger + Dbpool db.DB + Storage storage.StorageServe + RootRouter *http.ServeMux + UserRouter *http.ServeMux } -func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter { +func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter { router := &WebRouter{ - Cfg: cfg, - Logger: logger, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: analytics, + Cfg: cfg, + Logger: logger, + Dbpool: dbpool, + Storage: st, } router.initRouters() return router if !strings.Contains(hostDomain, appDomain) { subdomain := shared.GetCustomDomain(hostDomain, cfg.Space) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Error( "could not get project from subdomain", "host", r.Host, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Info( "could not determine project from subdomain", ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain) router.ServeHTTP(w, r.WithContext(ctx)) } - -type SubdomainProps struct { - ProjectName string - Username string -} - -func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { - props := &SubdomainProps{} - strs := strings.SplitN(subdomain, "-", 2) - props.Username = strs[0] - if len(strs) == 2 { - props.ProjectName = strs[1] - } else { - props.ProjectName = props.Username - } - return props, nil -} @@ shared/api.go "github.com/picosh/utils" ) +type SubdomainProps struct { + ProjectName string + Username string +} + +func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { + props := &SubdomainProps{} + strs := strings.SplitN(subdomain, "-", 2) + props.Username = strs[0] + if len(strs) == 2 { + props.ProjectName = strs[1] + } else { + props.ProjectName = props.Username + } + return props, nil +} + func CorsHeaders(headers http.Header) { headers.Add("Access-Control-Allow-Origin", "*") headers.Add("Vary", "Origin")
Range Diff ↕
1: c7eeb12 ! 1: 4e0839a reactor(metric-drain): use caddy access logs
@@ Makefile $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql + $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql .PHONY: migrate latest: - $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql + $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql .PHONY: latest psql: @@ auth/api.go "log/slog" "net/http" "net/url" + "strings" "time" "github.com/gorilla/feeds" "github.com/picosh/pico/db/postgres" "github.com/picosh/pico/shared" "github.com/picosh/utils" + "github.com/picosh/utils/pipe" "github.com/picosh/utils/pipe/metrics" ) } } +type AccessLogReq struct { + RemoteIP string `json:"remote_ip"` + RemotePort string `json:"remote_port"` + ClientIP string `json:"client_ip"` + Method string `json:"method"` + Host string `json:"host"` + Uri string `json:"uri"` + Headers struct { + UserAgent []string `json:"User-Agent"` + Referer []string `json:"Referer"` + } `json:"headers"` + Tls struct { + ServerName string `json:"server_name"` + } `json:"tls"` +} + +type RespHeaders struct { + ContentType []string `json:"Content-Type"` +} + +type CaddyAccessLog struct { + Request AccessLogReq `json:"request"` + Status int `json:"status"` + RespHeaders RespHeaders `json:"resp_headers"` +} + +func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) { + spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2) + space := spaceRaw[0] + host := access.Request.Host + path := access.Request.Uri + subdomain := "" + + // grab subdomain based on host + if strings.HasSuffix(host, "tuns.sh") { + subdomain = strings.TrimSuffix(host, ".tuns.sh") + } else if strings.HasSuffix(host, "pgs.sh") { + subdomain = strings.TrimSuffix(host, ".pgs.sh") + } else if strings.HasSuffix(host, "prose.sh") { + subdomain = strings.TrimSuffix(host, ".prose.sh") + } else { + subdomain = shared.GetCustomDomain(host, space) + } + + // get user and namespace details from subdomain + props, err := shared.GetProjectFromSubdomain(subdomain) + if err != nil { + return nil, err + } + // get user ID + user, err := dbpool.FindUserForName(props.Username) + if err != nil { + return nil, err + } + + projectID := "" + postID := "" + if space == "pgs" { // figure out project ID + project, err := dbpool.FindProjectByName(user.ID, props.ProjectName) + if err != nil { + return nil, err + } + projectID = project.ID + } else if space == "prose" { // figure out post ID + if path == "" || path == "/" { + } else { + post, err := dbpool.FindPostWithSlug(path, user.ID, space) + if err != nil { + return nil, err + } + postID = post.ID + } + } + + return &db.AnalyticsVisits{ + UserID: user.ID, + ProjectID: projectID, + PostID: postID, + Namespace: space, + Host: host, + Path: path, + IpAddress: access.Request.ClientIP, + UserAgent: strings.Join(access.Request.Headers.UserAgent, " "), + Referer: strings.Join(access.Request.Headers.Referer, " "), + ContentType: strings.Join(access.RespHeaders.ContentType, " "), + Status: access.Status, + }, nil +} + +// this feels really stupid because i'm taking containter-drain, +// filtering it, and then sending it to metric-drain. The +// metricDrainSub function listens on the metric-drain and saves it. +// So why not just call the necessary functions to save the visit? +// We want to be able to use pipe as a debugging tool which means we +// can manually sub to `metric-drain` and have a nice clean output to view. +func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) { + info := shared.NewPicoPipeClient() + drain := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "container drain", + "sub container-drain -k", + 100, + -1, + ) + + send := pipe.NewReconnectReadWriteCloser( + ctx, + logger, + info, + "from container drain to metric drain", + "pub metric-drain -b=false", + 100, + -1, + ) + + for { + scanner := bufio.NewScanner(drain) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "http.log.access") { + clean := strings.TrimSpace(line) + visit, err := accessLogToVisit(dbpool, clean) + if err != nil { + logger.Debug("could not convert access log to a visit", "err", err) + continue + } + jso, err := json.Marshal(visit) + if err != nil { + logger.Error("could not marshal json of a visit", "err", err) + continue + } + _, _ = send.Write(jso) + } + } + } +} + +func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) { + accessLog := CaddyAccessLog{} + err := json.Unmarshal([]byte(line), &accessLog) + if err != nil { + return nil, err + } + + return deserializeCaddyAccessLog(dbpool, &accessLog) +} + func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) { drain := metrics.ReconnectReadMetrics( ctx, visit := db.AnalyticsVisits{} err := json.Unmarshal([]byte(line), &visit) if err != nil { - logger.Error("json unmarshal", "err", err) + logger.Info("could not unmarshal json", "err", err, "line", line) continue } - - user := slog.Any("userId", visit.UserID) - err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret) if err != nil { if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Info("could not record analytics visit", "reason", err, "visit", visit, user) - continue + logger.Info("could not record analytics visit", "reason", err) } } - logger.Info("inserting visit", "visit", visit, user) + if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") { + continue + } + + logger.Info("inserting visit", "visit", visit) err = dbpool.InsertVisit(&visit) if err != nil { - logger.Error("could not insert visit record", "err", err, "visit", visit, user) + logger.Error("could not insert visit record", "err", err) } } - - if scanner.Err() != nil { - logger.Error("scanner error", "err", scanner.Err()) - } } } // gather metrics in the auth service go metricDrainSub(ctx, db, logger, cfg.Secret) + // convert container logs to access logs + go containerDrainSub(ctx, db, logger) + defer ctx.Done() apiConfig := &shared.ApiConfig{ @@ db/db.go } type AnalyticsVisits struct { - ID string `json:"id"` - UserID string `json:"user_id"` - ProjectID string `json:"project_id"` - PostID string `json:"post_id"` - Namespace string `json:"namespace"` - Host string `json:"host"` - Path string `json:"path"` - IpAddress string `json:"ip_address"` - UserAgent string `json:"user_agent"` - Referer string `json:"referer"` - Status int `json:"status"` + ID string `json:"id"` + UserID string `json:"user_id"` + ProjectID string `json:"project_id"` + PostID string `json:"post_id"` + Namespace string `json:"namespace"` + Host string `json:"host"` + Path string `json:"path"` + IpAddress string `json:"ip_address"` + UserAgent string `json:"user_agent"` + Referer string `json:"referer"` + Status int `json:"status"` + ContentType string `json:"content_type"` } type VisitInterval struct { @@ db/postgres/storage.go func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error { _, err := me.Db.Exec( - `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`, + `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`, visit.UserID, newNullString(visit.ProjectID), newNullString(visit.PostID), visit.UserAgent, visit.Referer, visit.Status, + visit.ContentType, ) return err } @@ imgs/api.go dbpool := shared.GetDB(r) logger := shared.GetLogger(r) username := shared.GetUsernameFromRequest(r) - analytics := shared.GetAnalyticsQueue(r) user, err := dbpool.FindUserForName(username) if err != nil { logger, dbpool, st, - analytics, ) router.ServeAsset(fname, opts, true, anyPerm, w, r) } @@ pastes/api.go Unlisted bool } -type TransparencyPageData struct { - Site shared.SitePageData - Analytics *db.Analytics -} - type Link struct { URL string Text string @@ pgs/ssh.go "github.com/charmbracelet/promwish" "github.com/charmbracelet/ssh" "github.com/charmbracelet/wish" - "github.com/picosh/pico/db" "github.com/picosh/pico/db/postgres" "github.com/picosh/pico/shared" "github.com/picosh/pico/shared/storage" st, ) - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) apiConfig := &shared.ApiConfig{ - Cfg: cfg, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: ch, + Cfg: cfg, + Dbpool: dbpool, + Storage: st, } webTunnel := &tunkit.WebTunnelHandler{ @@ pgs/tunnel.go "pubkey", pubkeyStr, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { log.Error(err.Error()) return http.HandlerFunc(shared.UnauthorizedHandler) logger, apiConfig.Dbpool, apiConfig.Storage, - apiConfig.AnalyticsQueue, ) tunnelRouter := TunnelWebRouter{routes} router := http.NewServeMux() @@ pgs/web.go return } - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) - - routes := NewWebRouter(cfg, logger, dbpool, st, ch) + routes := NewWebRouter(cfg, logger, dbpool, st) portStr := fmt.Sprintf(":%s", cfg.Port) logger.Info( type HasPerm = func(proj *db.Project) bool type WebRouter struct { - Cfg *shared.ConfigSite - Logger *slog.Logger - Dbpool db.DB - Storage storage.StorageServe - AnalyticsQueue chan *db.AnalyticsVisits - RootRouter *http.ServeMux - UserRouter *http.ServeMux + Cfg *shared.ConfigSite + Logger *slog.Logger + Dbpool db.DB + Storage storage.StorageServe + RootRouter *http.ServeMux + UserRouter *http.ServeMux } -func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter { +func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter { router := &WebRouter{ - Cfg: cfg, - Logger: logger, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: analytics, + Cfg: cfg, + Logger: logger, + Dbpool: dbpool, + Storage: st, } router.initRouters() return router if !strings.Contains(hostDomain, appDomain) { subdomain := shared.GetCustomDomain(hostDomain, cfg.Space) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Error( "could not get project from subdomain", "host", r.Host, ) - props, err := getProjectFromSubdomain(subdomain) + props, err := shared.GetProjectFromSubdomain(subdomain) if err != nil { logger.Info( "could not determine project from subdomain", ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain) router.ServeHTTP(w, r.WithContext(ctx)) } - -type SubdomainProps struct { - ProjectName string - Username string -} - -func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { - props := &SubdomainProps{} - strs := strings.SplitN(subdomain, "-", 2) - props.Username = strs[0] - if len(strs) == 2 { - props.ProjectName = strs[1] - } else { - props.ProjectName = props.Username - } - return props, nil -} @@ pgs/web_asset_handler.go package pgs import ( - "errors" "fmt" "io" "log/slog" "net/http/httputil" _ "net/http/pprof" - "github.com/picosh/pico/shared" "github.com/picosh/pico/shared/storage" sst "github.com/picosh/pobj/storage" ) "routes", strings.Join(attempts, ", "), "status", http.StatusNotFound, ) - // track 404s - ch := h.AnalyticsQueue - view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID) - if err == nil { - view.ProjectID = h.ProjectID - view.Status = http.StatusNotFound - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } http.Error(w, "404 not found", http.StatusNotFound) return } finContentType := w.Header().Get("content-type") - // only track pages, not individual assets - if finContentType == "text/html" { - // track visit - ch := h.AnalyticsQueue - view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID) - if err == nil { - view.ProjectID = h.ProjectID - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } - } - logger.Info( "serving asset", "asset", assetFilepath, @@ pgs/web_test.go "net/http/httptest" "strings" "testing" - "time" "github.com/picosh/pico/db" "github.com/picosh/pico/db/stub" responseRecorder := httptest.NewRecorder() st, _ := storage.NewStorageMemory(tc.storage) - ch := make(chan *db.AnalyticsVisits, 100) - router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch) + router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st) router.ServeHTTP(responseRecorder, request) if responseRecorder.Code != tc.status { } } -func TestAnalytics(t *testing.T) { - bucketName := shared.GetAssetBucketName(testUserID) - cfg := NewConfigSite() - cfg.Domain = "pgs.test" - expectedPath := "/app" - request := httptest.NewRequest("GET", mkpath(expectedPath), strings.NewReader("")) - responseRecorder := httptest.NewRecorder() - - sto := map[string]map[string]string{ - bucketName: { - "test/app.html": "hello world!", - }, - } - st, _ := storage.NewStorageMemory(sto) - ch := make(chan *db.AnalyticsVisits, 100) - dbpool := NewPgsAnalticsDb(cfg.Logger) - router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch) - - go func() { - for analytics := range ch { - if analytics.Path != expectedPath { - t.Errorf("Want path '%s', got '%s'", expectedPath, analytics.Path) - } - close(ch) - } - }() - - router.ServeHTTP(responseRecorder, request) - - select { - case <-ch: - return - case <-time.After(time.Second * 1): - t.Error("didnt receive analytics event within time limit") - } -} - type ImageStorageMemory struct { *storage.StorageMemory Opts *storage.ImgProcessOpts Ratio: &storage.Ratio{}, }, } - ch := make(chan *db.AnalyticsVisits, 100) - router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch) + router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st) router.ServeHTTP(responseRecorder, request) if responseRecorder.Code != tc.status { @@ prose/api.go import ( "bytes" - "errors" "fmt" "html/template" "net/http" Diff template.HTML } -type TransparencyPageData struct { - Site shared.SitePageData - Analytics *db.Analytics -} - type HeaderTxt struct { Title string Bio string postCollection = append(postCollection, p) } - // track visit - ch := shared.GetAnalyticsQueue(r) - view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID) - if err == nil { - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } - data := BlogPageData{ Site: *cfg.GetSiteData(), PageTitle: headerTxt.Title, username := shared.GetUsernameFromRequest(r) subdomain := shared.GetSubdomain(r) cfg := shared.GetCfg(r) - ch := shared.GetAnalyticsQueue(r) var slug string if !cfg.IsSubdomains() || subdomain == "" { ogImageCard = parsedText.ImageCard } - // track visit - view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID) - if err == nil { - view.PostID = post.ID - select { - case ch <- view: - default: - logger.Error("could not send analytics view to channel", "view", view) - } - } else { - if !errors.Is(err, shared.ErrAnalyticsDisabled) { - logger.Error("could not record analytics view", "err", err, "view", view) - } - } - unlisted := false if post.Hidden || post.PublishAt.After(time.Now()) { unlisted = true mainRoutes := createMainRoutes(staticRoutes) subdomainRoutes := createSubdomainRoutes(staticRoutes) - ch := make(chan *db.AnalyticsVisits, 100) - go shared.AnalyticsCollect(ch, dbpool, logger) apiConfig := &shared.ApiConfig{ - Cfg: cfg, - Dbpool: dbpool, - Storage: st, - AnalyticsQueue: ch, + Cfg: cfg, + Dbpool: dbpool, + Storage: st, } handler := shared.CreateServe(mainRoutes, subdomainRoutes, apiConfig) router := http.HandlerFunc(handler) @@ shared/api.go "github.com/picosh/utils" ) +type SubdomainProps struct { + ProjectName string + Username string +} + +func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) { + props := &SubdomainProps{} + strs := strings.SplitN(subdomain, "-", 2) + props.Username = strs[0] + if len(strs) == 2 { + props.ProjectName = strs[1] + } else { + props.ProjectName = props.Username + } + return props, nil +} + func CorsHeaders(headers http.Header) { headers.Add("Access-Control-Allow-Origin", "*") headers.Add("Vary", "Origin") @@ shared/router.go } type ApiConfig struct { - Cfg *ConfigSite - Dbpool db.DB - Storage storage.StorageServe - AnalyticsQueue chan *db.AnalyticsVisits + Cfg *ConfigSite + Dbpool db.DB + Storage storage.StorageServe } func (hc *ApiConfig) HasPrivilegedAccess(apiToken string) bool { ctx = context.WithValue(ctx, ctxDBKey{}, hc.Dbpool) ctx = context.WithValue(ctx, ctxStorageKey{}, hc.Storage) ctx = context.WithValue(ctx, ctxCfg{}, hc.Cfg) - ctx = context.WithValue(ctx, ctxAnalyticsQueue{}, hc.AnalyticsQueue) return ctx } type ctxStorageKey struct{} type ctxLoggerKey struct{} type ctxCfg struct{} -type ctxAnalyticsQueue struct{} type CtxSubdomainKey struct{} type ctxKey struct{} return "" } -func GetAnalyticsQueue(r *http.Request) chan *db.AnalyticsVisits { - return r.Context().Value(ctxAnalyticsQueue{}).(chan *db.AnalyticsVisits) -} - func GetApiToken(r *http.Request) string { authHeader := r.Header.Get("authorization") if authHeader == "" { @@ sql/migrations/20241125_add_content_type_to_analytics.sql +ALTER TABLE analytics_visits ADD COLUMN content_type varchar(256);
Patchset ps-77
reactor(metric-drain): use caddy json format
Eric Bower
auth/api.go
+106
-26
caddy.json
+40
-0
pgs/tunnel.go
+1
-1
pgs/web.go
+2
-19
shared/api.go
+17
-0
chore: wrap
Eric Bower
auth/api.go
+71
-41
done
Eric Bower
auth/api.go
+8
-7
reactor(metric-drain): use caddy json format
auth/api.go
link
+106
-26
+106
-26
1diff --git a/auth/api.go b/auth/api.go
2index 9c38bdc..8840af7 100644
3--- a/auth/api.go
4+++ b/auth/api.go
5@@ -14,6 +14,7 @@ import (
6 "log/slog"
7 "net/http"
8 "net/url"
9+ "strings"
10 "time"
11
12 "github.com/gorilla/feeds"
13@@ -578,6 +579,89 @@ func checkoutHandler() http.HandlerFunc {
14 }
15 }
16
17+type AccessLogReq struct {
18+ RemoteIP string `json:"remote_ip"`
19+ RemotePort string `json:"remote_port"`
20+ ClientIP string `json:"client_ip"`
21+ Method string `json:"method"`
22+ Host string `json:"host"`
23+ Uri string `json:"uri"`
24+ Headers struct {
25+ UserAgent string `json:"User-Agent"`
26+ Referer string `json:"Referer"`
27+ } `json:"headers"`
28+ Tls struct {
29+ ServerName string `json:"server_name"`
30+ } `json:"tls"`
31+}
32+
33+type CaddyAccessLog struct {
34+ Request AccessLogReq `json:"request"`
35+ Status int `json:"status"`
36+}
37+
38+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
39+ spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
40+ space := spaceRaw[0]
41+ host := access.Request.Host
42+ path := access.Request.Uri
43+ subdomain := ""
44+
45+ // grab subdomain based on host
46+ if strings.HasSuffix(host, "tuns.sh") {
47+ subdomain = strings.TrimSuffix(host, ".tuns.sh")
48+ } else if strings.HasSuffix(host, "pgs.sh") {
49+ subdomain = strings.TrimSuffix(host, ".pgs.sh")
50+ } else if strings.HasSuffix(host, "prose.sh") {
51+ subdomain = strings.TrimSuffix(host, ".prose.sh")
52+ } else {
53+ subdomain = shared.GetCustomDomain(host, space)
54+ }
55+
56+ // get user and namespace details from subdomain
57+ props, err := shared.GetProjectFromSubdomain(subdomain)
58+ if err != nil {
59+ return nil, err
60+ }
61+ // get user ID
62+ user, err := dbpool.FindUserForName(props.Username)
63+ if err != nil {
64+ return nil, err
65+ }
66+
67+ projectID := ""
68+ postID := ""
69+ if space == "pgs" { // figure out project ID
70+ project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
71+ if err != nil {
72+ return nil, err
73+ }
74+ projectID = project.ID
75+ } else if space == "prose" { // figure out post ID
76+ if path == "" || path == "/" {
77+ } else {
78+ post, err := dbpool.FindPostWithSlug(path, user.ID, space)
79+ if err != nil {
80+ return nil, err
81+ }
82+ postID = post.ID
83+ }
84+ }
85+
86+ return &db.AnalyticsVisits{
87+ UserID: user.ID,
88+ ProjectID: projectID,
89+ PostID: postID,
90+ Namespace: space,
91+ Host: host,
92+ Path: path,
93+ IpAddress: access.Request.ClientIP,
94+ UserAgent: access.Request.Headers.UserAgent,
95+ Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log
96+ Status: access.Status,
97+ }, nil
98+}
99+
100 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
101 drain := metrics.ReconnectReadMetrics(
102 ctx,
103@@ -587,36 +671,32 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
104 -1,
105 )
106
107- for {
108- scanner := bufio.NewScanner(drain)
109- for scanner.Scan() {
110- line := scanner.Text()
111- visit := db.AnalyticsVisits{}
112- err := json.Unmarshal([]byte(line), &visit)
113- if err != nil {
114- logger.Error("json unmarshal", "err", err)
115- continue
116- }
117-
118- user := slog.Any("userId", visit.UserID)
119-
120- err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
121- if err != nil {
122- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
123- logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
124- continue
125- }
126- }
127+ scanner := bufio.NewScanner(drain)
128+ for scanner.Scan() {
129+ line := scanner.Text()
130+ accessLog := CaddyAccessLog{}
131+ err := json.Unmarshal([]byte(line), &accessLog)
132+ if err != nil {
133+ logger.Error("json unmarshal", "err", err)
134+ continue
135+ }
136
137- logger.Info("inserting visit", "visit", visit, user)
138- err = dbpool.InsertVisit(&visit)
139- if err != nil {
140- logger.Error("could not insert visit record", "err", err, "visit", visit, user)
141+ visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
142+ if err != nil {
143+ logger.Error("cannot deserialize access log", "err", err)
144+ continue
145+ }
146+ err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
147+ if err != nil {
148+ if !errors.Is(err, shared.ErrAnalyticsDisabled) {
149+ logger.Info("could not record analytics visit", "reason", err)
150 }
151 }
152
153- if scanner.Err() != nil {
154- logger.Error("scanner error", "err", scanner.Err())
155+ logger.Info("inserting visit", "visit", visit)
156+ err = dbpool.InsertVisit(visit)
157+ if err != nil {
158+ logger.Error("could not insert visit record", "err", err)
159 }
160 }
161 }
caddy.json
link
+40
-0
+40
-0
1diff --git a/caddy.json b/caddy.json
2new file mode 100644
3index 0000000..49e80ec
4--- /dev/null
5+++ b/caddy.json
6@@ -0,0 +1,40 @@
7+{
8+ "level": "info",
9+ "ts": 1731644477.313701,
10+ "logger": "http.log.access",
11+ "msg": "handled request",
12+ "request": {
13+ "remote_ip": "127.0.0.1",
14+ "remote_port": "40400",
15+ "client_ip": "127.0.0.1",
16+ "proto": "HTTP/2.0",
17+ "method": "GET",
18+ "host": "pgs.sh",
19+ "uri": "/",
20+ "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] },
21+ "tls": {
22+ "resumed": false,
23+ "version": 772,
24+ "cipher_suite": 4865,
25+ "proto": "h2",
26+ "server_name": "pgs.sh"
27+ }
28+ },
29+ "bytes_read": 0,
30+ "user_id": "",
31+ "duration": 0.001207084,
32+ "size": 3718,
33+ "status": 200,
34+ "resp_headers": {
35+ "Referrer-Policy": ["no-referrer-when-downgrade"],
36+ "Strict-Transport-Security": ["max-age=31536000;"],
37+ "X-Content-Type-Options": ["nosniff"],
38+ "X-Frame-Options": ["DENY"],
39+ "Server": ["Caddy"],
40+ "Alt-Svc": ["h3=\":443\"; ma=2592000"],
41+ "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"],
42+ "Content-Type": ["text/html; charset=utf-8"],
43+ "X-Xss-Protection": ["1; mode=block"],
44+ "Permissions-Policy": ["interest-cohort=()"]
45+ }
46+}
pgs/tunnel.go
link
+1
-1
+1
-1
1diff --git a/pgs/tunnel.go b/pgs/tunnel.go
2index b635c8e..accacc5 100644
3--- a/pgs/tunnel.go
4+++ b/pgs/tunnel.go
5@@ -51,7 +51,7 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
6 "pubkey", pubkeyStr,
7 )
8
9- props, err := getProjectFromSubdomain(subdomain)
10+ props, err := shared.GetProjectFromSubdomain(subdomain)
11 if err != nil {
12 log.Error(err.Error())
13 return http.HandlerFunc(shared.UnauthorizedHandler)
pgs/web.go
link
+2
-19
+2
-19
1diff --git a/pgs/web.go b/pgs/web.go
2index 685f6ef..0903c1a 100644
3--- a/pgs/web.go
4+++ b/pgs/web.go
5@@ -177,7 +177,7 @@ func (web *WebRouter) checkHandler(w http.ResponseWriter, r *http.Request) {
6
7 if !strings.Contains(hostDomain, appDomain) {
8 subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
9- props, err := getProjectFromSubdomain(subdomain)
10+ props, err := shared.GetProjectFromSubdomain(subdomain)
11 if err != nil {
12 logger.Error(
13 "could not get project from subdomain",
14@@ -333,7 +333,7 @@ func (web *WebRouter) ServeAsset(fname string, opts *storage.ImgProcessOpts, fro
15 "host", r.Host,
16 )
17
18- props, err := getProjectFromSubdomain(subdomain)
19+ props, err := shared.GetProjectFromSubdomain(subdomain)
20 if err != nil {
21 logger.Info(
22 "could not determine project from subdomain",
23@@ -450,20 +450,3 @@ func (web *WebRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
24 ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
25 router.ServeHTTP(w, r.WithContext(ctx))
26 }
27-
28-type SubdomainProps struct {
29- ProjectName string
30- Username string
31-}
32-
33-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
34- props := &SubdomainProps{}
35- strs := strings.SplitN(subdomain, "-", 2)
36- props.Username = strs[0]
37- if len(strs) == 2 {
38- props.ProjectName = strs[1]
39- } else {
40- props.ProjectName = props.Username
41- }
42- return props, nil
43-}
wip
auth/api.go
link
+29
-0
+29
-0
1diff --git a/auth/api.go b/auth/api.go
2index 8840af7..6d997ab 100644
3--- a/auth/api.go
4+++ b/auth/api.go
5@@ -22,6 +22,7 @@ import (
6 "github.com/picosh/pico/db/postgres"
7 "github.com/picosh/pico/shared"
8 "github.com/picosh/utils"
9+ "github.com/picosh/utils/pipe"
10 "github.com/picosh/utils/pipe/metrics"
11 )
12
13@@ -662,6 +663,30 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
14 }, nil
15 }
16
17+func containerDrainSub(ctx context.Context, logger *slog.Logger) {
18+ drain := pipe.NewReconnectReadWriteCloser(
19+ ctx,
20+ logger,
21+ shared.NewPicoPipeClient(),
22+ "container logs",
23+ "sub container-drain -k",
24+ 100,
25+ -1,
26+ )
27+
28+ fmt.Println("WTFFFFFF")
29+ scanner := bufio.NewScanner(drain)
30+ for scanner.Scan() {
31+ line := scanner.Text()
32+ fmt.Println("HMMMM", line)
33+ if strings.Contains(line, "http.log.access") {
34+ clean := strings.TrimSpace(line)
35+ fmt.Println("LINE", clean)
36+ // TODO: send to metric drain
37+ }
38+ }
39+}
40+
41 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
42 drain := metrics.ReconnectReadMetrics(
43 ctx,
44@@ -699,6 +724,7 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
45 logger.Error("could not insert visit record", "err", err)
46 }
47 }
48+ fmt.Println("DROPINNGGGGGGG")
49 }
50
51 func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
52@@ -769,6 +795,9 @@ func StartApiServer() {
53
54 // gather metrics in the auth service
55 go metricDrainSub(ctx, db, logger, cfg.Secret)
56+ // convert container logs to access logs
57+ // go containerDrainSub(ctx, logger)
58+
59 defer ctx.Done()
60
61 apiConfig := &shared.ApiConfig{
test.txt
link
+0
-0
+0
-0
1diff --git a/test.txt b/test.txt
2new file mode 100644
3index 0000000000000000000000000000000000000000..9670dcf3682d2391c3391099ac7c64000ffa22ef
4GIT binary patch
5literal 1060
6zc$|Dw!EWO=5G{(H`VBo0bLuW6nv`r=k<ddbq}!m$BG{;#0!a`s<X9#^lPXEuNP_%(
7zK{>&u?Y24*Z{8ce;XHl&P5Qq3;Ry`x&_Dy)t{h|#12lYD7A32T`GT^l=108Dz_?*R
8z4-Kqi9I0^w6;fgdkijMl2^UrwRK(+-TMQ90cs(^w;Bn(3-suq<;8j8SqNbdcw5liG
9zR2-d;&_GemiwZK3Mx%}YAsM}k4jTBi?=NPY^5g>J+9_z@!$}VrtX;Yp_WL~*JM?0}
10zFd7{Lm2LwWY`umCX3}qF6zwhvP$={vx&0&m#reJP3ROBma}CRy@<mDcoYA5v;5LI}
11zDRGSXiIq-iJ0#NZsK-?5R(a>FMH#gn^3(C_4Z2nSNj#)ljn%#6RdFO#fvY2)Umag1
12z##<j3hl#JJ_y2N#Lf+5bzw|pDrQ6!*r>-A<enCI$pyOeTryB@zjT^jy=4NVj1J4uf
13z#jmr8_8a1u9~-^Hsitgx=G-)fG;XU_MtE$Aac&Gh@kIGOpwEv7w3DH8_o$5#vpHqt
14zN9i`Bi2B}OOU^G(Pe@H!ORlt4ZXd|Uo>Y{#r1x5(jY-my|JPUl&@Yny@-u%&T5@^T
15zbwx|)$CI9PXP-zx8C|etv3!G-bU{nDEROz$u~+OIl@_E!8_|=C?r>cq&ME6!a%xxZ
16zLc}#KThfbrgE5f`%HF~4^K%CG4-vziN1>o$8EgakJDp+)8K|unhbLx~F~<0^o@WsB
17zBTn29Mcf8xN>-<zB8+!$GI{2tcwJB0TWDspuZH_erVne4Xk*?#j0ny3Z3dU<cQ|FC
18V6G}b&d-u|~fN7TR{|W#A|NpuqJn;Yk
19
20literal 0
21Kc$@(M0RR6000031
22
chore: wrap
auth/api.go
link
+71
-41
+71
-41
1diff --git a/auth/api.go b/auth/api.go
2index 6d997ab..c25e2e7 100644
3--- a/auth/api.go
4+++ b/auth/api.go
5@@ -588,8 +588,8 @@ type AccessLogReq struct {
6 Host string `json:"host"`
7 Uri string `json:"uri"`
8 Headers struct {
9- UserAgent string `json:"User-Agent"`
10- Referer string `json:"Referer"`
11+ UserAgent []string `json:"User-Agent"`
12+ Referer string `json:"Referer"`
13 } `json:"headers"`
14 Tls struct {
15 ServerName string `json:"server_name"`
16@@ -657,36 +657,72 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
17 Host: host,
18 Path: path,
19 IpAddress: access.Request.ClientIP,
20- UserAgent: access.Request.Headers.UserAgent,
21+ UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
22 Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log
23 Status: access.Status,
24 }, nil
25 }
26
27-func containerDrainSub(ctx context.Context, logger *slog.Logger) {
28+// this feels really stupid because i'm taking containter-drain,
29+// filtering it, and then sending it to metric-drain. The
30+// metricDrainSub function listens on the metric-drain and saves it.
31+// So why not just call the necessary functions to save the visit?
32+// We want to be able to have pipe as a debugging tool which means we
33+// can sub to `metric-drain` and have a nice clean output to look use.
34+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
35+ info := shared.NewPicoPipeClient()
36 drain := pipe.NewReconnectReadWriteCloser(
37 ctx,
38 logger,
39- shared.NewPicoPipeClient(),
40- "container logs",
41+ info,
42+ "container drain",
43 "sub container-drain -k",
44 100,
45 -1,
46 )
47
48- fmt.Println("WTFFFFFF")
49- scanner := bufio.NewScanner(drain)
50- for scanner.Scan() {
51- line := scanner.Text()
52- fmt.Println("HMMMM", line)
53- if strings.Contains(line, "http.log.access") {
54- clean := strings.TrimSpace(line)
55- fmt.Println("LINE", clean)
56- // TODO: send to metric drain
57+ send := pipe.NewReconnectReadWriteCloser(
58+ ctx,
59+ logger,
60+ info,
61+ "from container drain to metric drain",
62+ "pub metric-drain -b=false",
63+ 100,
64+ -1,
65+ )
66+
67+ for {
68+ scanner := bufio.NewScanner(drain)
69+ for scanner.Scan() {
70+ line := scanner.Text()
71+ if strings.Contains(line, "http.log.access") {
72+ clean := strings.TrimSpace(line)
73+ visit, err := accessLogToVisit(dbpool, clean)
74+ if err != nil {
75+ logger.Error("could not convert access log to a visit", "err", err)
76+ continue
77+ }
78+ jso, err := json.Marshal(visit)
79+ if err != nil {
80+ logger.Error("could not marshal json of a visit", "err", err)
81+ continue
82+ }
83+ _, _ = send.Write(jso)
84+ }
85 }
86 }
87 }
88
89+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
90+ accessLog := CaddyAccessLog{}
91+ err := json.Unmarshal([]byte(line), &accessLog)
92+ if err != nil {
93+ return nil, err
94+ }
95+
96+ return deserializeCaddyAccessLog(dbpool, &accessLog)
97+}
98+
99 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
100 drain := metrics.ReconnectReadMetrics(
101 ctx,
102@@ -696,35 +732,29 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
103 -1,
104 )
105
106- scanner := bufio.NewScanner(drain)
107- for scanner.Scan() {
108- line := scanner.Text()
109- accessLog := CaddyAccessLog{}
110- err := json.Unmarshal([]byte(line), &accessLog)
111- if err != nil {
112- logger.Error("json unmarshal", "err", err)
113- continue
114- }
115-
116- visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
117- if err != nil {
118- logger.Error("cannot deserialize access log", "err", err)
119- continue
120- }
121- err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
122- if err != nil {
123- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
124- logger.Info("could not record analytics visit", "reason", err)
125+ for {
126+ scanner := bufio.NewScanner(drain)
127+ for scanner.Scan() {
128+ line := scanner.Text()
129+ visit, err := accessLogToVisit(dbpool, line)
130+ if err != nil {
131+ logger.Error("could not convert access log to a visit", "err", err)
132+ continue
133+ }
134+ err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
135+ if err != nil {
136+ if !errors.Is(err, shared.ErrAnalyticsDisabled) {
137+ logger.Info("could not record analytics visit", "reason", err)
138+ }
139 }
140- }
141
142- logger.Info("inserting visit", "visit", visit)
143- err = dbpool.InsertVisit(visit)
144- if err != nil {
145- logger.Error("could not insert visit record", "err", err)
146+ logger.Info("inserting visit", "visit", visit)
147+ err = dbpool.InsertVisit(visit)
148+ if err != nil {
149+ logger.Error("could not insert visit record", "err", err)
150+ }
151 }
152 }
153- fmt.Println("DROPINNGGGGGGG")
154 }
155
156 func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
157@@ -796,7 +826,7 @@ func StartApiServer() {
158 // gather metrics in the auth service
159 go metricDrainSub(ctx, db, logger, cfg.Secret)
160 // convert container logs to access logs
161- // go containerDrainSub(ctx, logger)
162+ go containerDrainSub(ctx, db, logger)
163
164 defer ctx.Done()
165
done
auth/api.go
link
+8
-7
+8
-7
1diff --git a/auth/api.go b/auth/api.go
2index c25e2e7..09d7661 100644
3--- a/auth/api.go
4+++ b/auth/api.go
5@@ -589,7 +589,7 @@ type AccessLogReq struct {
6 Uri string `json:"uri"`
7 Headers struct {
8 UserAgent []string `json:"User-Agent"`
9- Referer string `json:"Referer"`
10+ Referer []string `json:"Referer"`
11 } `json:"headers"`
12 Tls struct {
13 ServerName string `json:"server_name"`
14@@ -658,7 +658,7 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
15 Path: path,
16 IpAddress: access.Request.ClientIP,
17 UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
18- Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log
19+ Referer: strings.Join(access.Request.Headers.Referer, " "),
20 Status: access.Status,
21 }, nil
22 }
23@@ -699,7 +699,7 @@ func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
24 clean := strings.TrimSpace(line)
25 visit, err := accessLogToVisit(dbpool, clean)
26 if err != nil {
27- logger.Error("could not convert access log to a visit", "err", err)
28+ logger.Debug("could not convert access log to a visit", "err", err)
29 continue
30 }
31 jso, err := json.Marshal(visit)
32@@ -736,12 +736,13 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
33 scanner := bufio.NewScanner(drain)
34 for scanner.Scan() {
35 line := scanner.Text()
36- visit, err := accessLogToVisit(dbpool, line)
37+ visit := db.AnalyticsVisits{}
38+ err := json.Unmarshal([]byte(line), &visit)
39 if err != nil {
40- logger.Error("could not convert access log to a visit", "err", err)
41+ logger.Info("could not unmarshal json", "err", err, "line", line)
42 continue
43 }
44- err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
45+ err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
46 if err != nil {
47 if !errors.Is(err, shared.ErrAnalyticsDisabled) {
48 logger.Info("could not record analytics visit", "reason", err)
49@@ -749,7 +750,7 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
50 }
51
52 logger.Info("inserting visit", "visit", visit)
53- err = dbpool.InsertVisit(visit)
54+ err = dbpool.InsertVisit(&visit)
55 if err != nil {
56 logger.Error("could not insert visit record", "err", err)
57 }