dashboard / erock/pico / reactor(metric-drain): use caddy json format #35 rss

accepted · opened on 2024-11-15T15:03:31Z by erock
Help
checkout latest patchset:
ssh pr.pico.sh print pr-35 | git am -3
checkout any patchset in a patch request:
ssh pr.pico.sh print ps-X | git am -3
add changes to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add 35
add review to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add --review 35
accept PR:
ssh pr.pico.sh pr accept 35
close PR:
ssh pr.pico.sh pr close 35
Timeline Patchsets

Range-diff rd-78

title
reactor(metric-drain): use caddy access logs
description
Patch changed
old #1
8d56535
new #1
c7eeb12
title
wip
description
Patch removed
old #2
a336041
new #0
(none)
title
chore: wrap
description
Patch removed
old #3
7ae45b3
new #0
(none)
title
done
description
Patch removed
old #4
bfa5c4f
new #0
(none)
Back to top
1: 8d56535 ! 1: c7eeb12 reactor(metric-drain): use caddy access logs
title changed
- reactor(metric-drain): use caddy json format
+ reactor(metric-drain): use caddy access logs
message changed
- 
+ Previously we were sending site usage analytics within our web app code.
This worked well for our use case because we could filter, parse, and
send the analytics to our pipe `metric-drain` which would then store the
analytics into our database.

Because we want to enable HTTP caching for pgs we won't always reach our
web app code since usage analytics will terminate at our cache layer.

Instead, we want to record analytics higher in the request stack.  In
this case, we want to record site analytics from Caddy access logs.

Here's how it works:

- `pub` caddy access logs to our pipe `container-drain`
- `auth/web` will `sub` to `container-drain`, filter, deserialize, and
  `pub` to `metric-drain`
- `auth/web` will `sub` to `metric-drain` and store the analytics in our
  database

old

old:auth/api.go new:auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent string `json:"User-Agent"`
+		Referer   string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type CaddyAccessLog struct {
+	Request AccessLogReq `json:"request"`
+	Status  int          `json:"status"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:    user.ID,
+		ProjectID: projectID,
+		PostID:    postID,
+		Namespace: space,
+		Host:      host,
+		Path:      path,
+		IpAddress: access.Request.ClientIP,
+		UserAgent: access.Request.Headers.UserAgent,
+		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
+		Status:    access.Status,
+	}, nil
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 		-1,
 	)
 
-	for {
-		scanner := bufio.NewScanner(drain)
-		for scanner.Scan() {
-			line := scanner.Text()
-			visit := db.AnalyticsVisits{}
-			err := json.Unmarshal([]byte(line), &visit)
-			if err != nil {
-				logger.Error("json unmarshal", "err", err)
-				continue
-			}
-
-			user := slog.Any("userId", visit.UserID)
-
-			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			if err != nil {
-				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
-				}
-			}
+	scanner := bufio.NewScanner(drain)
+	for scanner.Scan() {
+		line := scanner.Text()
+		accessLog := CaddyAccessLog{}
+		err := json.Unmarshal([]byte(line), &accessLog)
+		if err != nil {
+			logger.Error("json unmarshal", "err", err)
+			continue
+		}
 
-			logger.Info("inserting visit", "visit", visit, user)
-			err = dbpool.InsertVisit(&visit)
-			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
+		if err != nil {
+			logger.Error("cannot deserialize access log", "err", err)
+			continue
+		}
+		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+		if err != nil {
+			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+				logger.Info("could not record analytics visit", "reason", err)
 			}
 		}
 
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
+		logger.Info("inserting visit", "visit", visit)
+		err = dbpool.InsertVisit(visit)
+		if err != nil {
+			logger.Error("could not insert visit record", "err", err)
 		}
 	}
 }

new

old:auth/api.go new:auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/utils"
+	"github.com/picosh/utils/pipe"
 	"github.com/picosh/utils/pipe/metrics"
 )
 
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent []string `json:"User-Agent"`
+		Referer   []string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type RespHeaders struct {
+	ContentType []string `json:"Content-Type"`
+}
+
+type CaddyAccessLog struct {
+	Request     AccessLogReq `json:"request"`
+	Status      int          `json:"status"`
+	RespHeaders RespHeaders  `json:"resp_headers"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:      user.ID,
+		ProjectID:   projectID,
+		PostID:      postID,
+		Namespace:   space,
+		Host:        host,
+		Path:        path,
+		IpAddress:   access.Request.ClientIP,
+		UserAgent:   strings.Join(access.Request.Headers.UserAgent, " "),
+		Referer:     strings.Join(access.Request.Headers.Referer, " "),
+		ContentType: strings.Join(access.RespHeaders.ContentType, " "),
+		Status:      access.Status,
+	}, nil
+}
+
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain.  The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to use pipe as a debugging tool which means we
+// can manually sub to `metric-drain` and have a nice clean output to view.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	info := shared.NewPicoPipeClient()
+	drain := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"container drain",
+		"sub container-drain -k",
+		100,
+		-1,
+	)
+
+	send := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"from container drain to metric drain",
+		"pub metric-drain -b=false",
+		100,
+		-1,
+	)
+
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			if strings.Contains(line, "http.log.access") {
+				clean := strings.TrimSpace(line)
+				visit, err := accessLogToVisit(dbpool, clean)
+				if err != nil {
+					logger.Debug("could not convert access log to a visit", "err", err)
+					continue
+				}
+				jso, err := json.Marshal(visit)
+				if err != nil {
+					logger.Error("could not marshal json of a visit", "err", err)
+					continue
+				}
+				_, _ = send.Write(jso)
+			}
+		}
+	}
+}
+
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+	accessLog := CaddyAccessLog{}
+	err := json.Unmarshal([]byte(line), &accessLog)
+	if err != nil {
+		return nil, err
+	}
+
+	return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 			visit := db.AnalyticsVisits{}
 			err := json.Unmarshal([]byte(line), &visit)
 			if err != nil {
-				logger.Error("json unmarshal", "err", err)
+				logger.Info("could not unmarshal json", "err", err, "line", line)
 				continue
 			}
-
-			user := slog.Any("userId", visit.UserID)
-
 			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
 			if err != nil {
 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
+					logger.Info("could not record analytics visit", "reason", err)
 				}
 			}
 
-			logger.Info("inserting visit", "visit", visit, user)
+			if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
+				continue
+			}
+
+			logger.Info("inserting visit", "visit", visit)
 			err = dbpool.InsertVisit(&visit)
 			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+				logger.Error("could not insert visit record", "err", err)
 			}
 		}
-
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
-		}
 	}
 }
 
 
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
+	// convert container logs to access logs
+	go containerDrainSub(ctx, db, logger)
+
 	defer ctx.Done()
 
 	apiConfig := &shared.ApiConfig{

old

old:pgs/tunnel.go new:pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)

new

old:pgs/tunnel.go new:pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)
 			logger,
 			apiConfig.Dbpool,
 			apiConfig.Storage,
-			apiConfig.AnalyticsQueue,
 		)
 		tunnelRouter := TunnelWebRouter{routes}
 		router := http.NewServeMux()

old

old:pgs/web.go new:pgs/web.go
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

new

old:pgs/web.go new:pgs/web.go
 		return
 	}
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
-
-	routes := NewWebRouter(cfg, logger, dbpool, st, ch)
+	routes := NewWebRouter(cfg, logger, dbpool, st)
 
 	portStr := fmt.Sprintf(":%s", cfg.Port)
 	logger.Info(
 type HasPerm = func(proj *db.Project) bool
 
 type WebRouter struct {
-	Cfg            *shared.ConfigSite
-	Logger         *slog.Logger
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
-	RootRouter     *http.ServeMux
-	UserRouter     *http.ServeMux
+	Cfg        *shared.ConfigSite
+	Logger     *slog.Logger
+	Dbpool     db.DB
+	Storage    storage.StorageServe
+	RootRouter *http.ServeMux
+	UserRouter *http.ServeMux
 }
 
-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
 	router := &WebRouter{
-		Cfg:            cfg,
-		Logger:         logger,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: analytics,
+		Cfg:     cfg,
+		Logger:  logger,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	router.initRouters()
 	return router
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

old


                    

new

old:Makefile new:Makefile
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
 .PHONY: migrate
 
 latest:
-	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
 .PHONY: latest
 
 psql:

old


                    

new

old:db/db.go new:db/db.go
 }
 
 type AnalyticsVisits struct {
-	ID        string `json:"id"`
-	UserID    string `json:"user_id"`
-	ProjectID string `json:"project_id"`
-	PostID    string `json:"post_id"`
-	Namespace string `json:"namespace"`
-	Host      string `json:"host"`
-	Path      string `json:"path"`
-	IpAddress string `json:"ip_address"`
-	UserAgent string `json:"user_agent"`
-	Referer   string `json:"referer"`
-	Status    int    `json:"status"`
+	ID          string `json:"id"`
+	UserID      string `json:"user_id"`
+	ProjectID   string `json:"project_id"`
+	PostID      string `json:"post_id"`
+	Namespace   string `json:"namespace"`
+	Host        string `json:"host"`
+	Path        string `json:"path"`
+	IpAddress   string `json:"ip_address"`
+	UserAgent   string `json:"user_agent"`
+	Referer     string `json:"referer"`
+	Status      int    `json:"status"`
+	ContentType string `json:"content_type"`
 }
 
 type VisitInterval struct {

old


                    

new

old:db/postgres/storage.go new:db/postgres/storage.go
 
 func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
 	_, err := me.Db.Exec(
-		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
+		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
 		visit.UserID,
 		newNullString(visit.ProjectID),
 		newNullString(visit.PostID),
 		visit.UserAgent,
 		visit.Referer,
 		visit.Status,
+		visit.ContentType,
 	)
 	return err
 }

old


                    

new

old:imgs/api.go new:imgs/api.go
 	dbpool := shared.GetDB(r)
 	logger := shared.GetLogger(r)
 	username := shared.GetUsernameFromRequest(r)
-	analytics := shared.GetAnalyticsQueue(r)
 
 	user, err := dbpool.FindUserForName(username)
 	if err != nil {
 		logger,
 		dbpool,
 		st,
-		analytics,
 	)
 	router.ServeAsset(fname, opts, true, anyPerm, w, r)
 }

old


                    

new

old:pastes/api.go new:pastes/api.go
 	Unlisted     bool
 }
 
-type TransparencyPageData struct {
-	Site      shared.SitePageData
-	Analytics *db.Analytics
-}
-
 type Link struct {
 	URL  string
 	Text string

old


                    

new

old:pgs/ssh.go new:pgs/ssh.go
 	"github.com/charmbracelet/promwish"
 	"github.com/charmbracelet/ssh"
 	"github.com/charmbracelet/wish"
-	"github.com/picosh/pico/db"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/pico/shared/storage"
 		st,
 	)
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
 	apiConfig := &shared.ApiConfig{
-		Cfg:            cfg,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: ch,
+		Cfg:     cfg,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 
 	webTunnel := &tunkit.WebTunnelHandler{

old


                    

new

old:pgs/web_asset_handler.go new:pgs/web_asset_handler.go
 package pgs
 
 import (
-	"errors"
 	"fmt"
 	"io"
 	"log/slog"
 	"net/http/httputil"
 	_ "net/http/pprof"
 
-	"github.com/picosh/pico/shared"
 	"github.com/picosh/pico/shared/storage"
 	sst "github.com/picosh/pobj/storage"
 )
 			"routes", strings.Join(attempts, ", "),
 			"status", http.StatusNotFound,
 		)
-		// track 404s
-		ch := h.AnalyticsQueue
-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
-		if err == nil {
-			view.ProjectID = h.ProjectID
-			view.Status = http.StatusNotFound
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
 		http.Error(w, "404 not found", http.StatusNotFound)
 		return
 	}
 
 	finContentType := w.Header().Get("content-type")
 
-	// only track pages, not individual assets
-	if finContentType == "text/html" {
-		// track visit
-		ch := h.AnalyticsQueue
-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
-		if err == nil {
-			view.ProjectID = h.ProjectID
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
-	}
-
 	logger.Info(
 		"serving asset",
 		"asset", assetFilepath,

old


                    

new

old:pgs/web_test.go new:pgs/web_test.go
 	"net/http/httptest"
 	"strings"
 	"testing"
-	"time"
 
 	"github.com/picosh/pico/db"
 	"github.com/picosh/pico/db/stub"
 			responseRecorder := httptest.NewRecorder()
 
 			st, _ := storage.NewStorageMemory(tc.storage)
-			ch := make(chan *db.AnalyticsVisits, 100)
-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
 			router.ServeHTTP(responseRecorder, request)
 
 			if responseRecorder.Code != tc.status {
 	}
 }
 
-func TestAnalytics(t *testing.T) {
-	bucketName := shared.GetAssetBucketName(testUserID)
-	cfg := NewConfigSite()
-	cfg.Domain = "pgs.test"
-	expectedPath := "/app"
-	request := httptest.NewRequest("GET", mkpath(expectedPath), strings.NewReader(""))
-	responseRecorder := httptest.NewRecorder()
-
-	sto := map[string]map[string]string{
-		bucketName: {
-			"test/app.html": "hello world!",
-		},
-	}
-	st, _ := storage.NewStorageMemory(sto)
-	ch := make(chan *db.AnalyticsVisits, 100)
-	dbpool := NewPgsAnalticsDb(cfg.Logger)
-	router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch)
-
-	go func() {
-		for analytics := range ch {
-			if analytics.Path != expectedPath {
-				t.Errorf("Want path '%s', got '%s'", expectedPath, analytics.Path)
-			}
-			close(ch)
-		}
-	}()
-
-	router.ServeHTTP(responseRecorder, request)
-
-	select {
-	case <-ch:
-		return
-	case <-time.After(time.Second * 1):
-		t.Error("didnt receive analytics event within time limit")
-	}
-}
-
 type ImageStorageMemory struct {
 	*storage.StorageMemory
 	Opts  *storage.ImgProcessOpts
 					Ratio: &storage.Ratio{},
 				},
 			}
-			ch := make(chan *db.AnalyticsVisits, 100)
-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
 			router.ServeHTTP(responseRecorder, request)
 
 			if responseRecorder.Code != tc.status {

old


                    

new

old:prose/api.go new:prose/api.go
 
 import (
 	"bytes"
-	"errors"
 	"fmt"
 	"html/template"
 	"net/http"
 	Diff         template.HTML
 }
 
-type TransparencyPageData struct {
-	Site      shared.SitePageData
-	Analytics *db.Analytics
-}
-
 type HeaderTxt struct {
 	Title      string
 	Bio        string
 		postCollection = append(postCollection, p)
 	}
 
-	// track visit
-	ch := shared.GetAnalyticsQueue(r)
-	view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
-	if err == nil {
-		select {
-		case ch <- view:
-		default:
-			logger.Error("could not send analytics view to channel", "view", view)
-		}
-	} else {
-		if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-			logger.Error("could not record analytics view", "err", err, "view", view)
-		}
-	}
-
 	data := BlogPageData{
 		Site:       *cfg.GetSiteData(),
 		PageTitle:  headerTxt.Title,
 	username := shared.GetUsernameFromRequest(r)
 	subdomain := shared.GetSubdomain(r)
 	cfg := shared.GetCfg(r)
-	ch := shared.GetAnalyticsQueue(r)
 
 	var slug string
 	if !cfg.IsSubdomains() || subdomain == "" {
 			ogImageCard = parsedText.ImageCard
 		}
 
-		// track visit
-		view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
-		if err == nil {
-			view.PostID = post.ID
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
-
 		unlisted := false
 		if post.Hidden || post.PublishAt.After(time.Now()) {
 			unlisted = true
 	mainRoutes := createMainRoutes(staticRoutes)
 	subdomainRoutes := createSubdomainRoutes(staticRoutes)
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
 	apiConfig := &shared.ApiConfig{
-		Cfg:            cfg,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: ch,
+		Cfg:     cfg,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	handler := shared.CreateServe(mainRoutes, subdomainRoutes, apiConfig)
 	router := http.HandlerFunc(handler)

old


                    

new

old:shared/router.go new:shared/router.go
 }
 
 type ApiConfig struct {
-	Cfg            *ConfigSite
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
+	Cfg     *ConfigSite
+	Dbpool  db.DB
+	Storage storage.StorageServe
 }
 
 func (hc *ApiConfig) HasPrivilegedAccess(apiToken string) bool {
 	ctx = context.WithValue(ctx, ctxDBKey{}, hc.Dbpool)
 	ctx = context.WithValue(ctx, ctxStorageKey{}, hc.Storage)
 	ctx = context.WithValue(ctx, ctxCfg{}, hc.Cfg)
-	ctx = context.WithValue(ctx, ctxAnalyticsQueue{}, hc.AnalyticsQueue)
 	return ctx
 }
 
 type ctxStorageKey struct{}
 type ctxLoggerKey struct{}
 type ctxCfg struct{}
-type ctxAnalyticsQueue struct{}
 
 type CtxSubdomainKey struct{}
 type ctxKey struct{}
 	return ""
 }
 
-func GetAnalyticsQueue(r *http.Request) chan *db.AnalyticsVisits {
-	return r.Context().Value(ctxAnalyticsQueue{}).(chan *db.AnalyticsVisits)
-}
-
 func GetApiToken(r *http.Request) string {
 	authHeader := r.Header.Get("authorization")
 	if authHeader == "" {

old


                    

new

new:sql/migrations/20241125_add_content_type_to_analytics.sql
+ALTER TABLE analytics_visits ADD COLUMN content_type varchar(256);

old


                    

new

new:test.txt

                    
2: a336041 < -: ------- wip

old

old:auth/api.go new:auth/api.go
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/utils"
+	"github.com/picosh/utils/pipe"
 	"github.com/picosh/utils/pipe/metrics"
 )
 
 	}, nil
 }
 
+func containerDrainSub(ctx context.Context, logger *slog.Logger) {
+	drain := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		shared.NewPicoPipeClient(),
+		"container logs",
+		"sub container-drain -k",
+		100,
+		-1,
+	)
+
+	fmt.Println("WTFFFFFF")
+	scanner := bufio.NewScanner(drain)
+	for scanner.Scan() {
+		line := scanner.Text()
+		fmt.Println("HMMMM", line)
+		if strings.Contains(line, "http.log.access") {
+			clean := strings.TrimSpace(line)
+			fmt.Println("LINE", clean)
+			// TODO: send to metric drain
+		}
+	}
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 			logger.Error("could not insert visit record", "err", err)
 		}
 	}
+	fmt.Println("DROPINNGGGGGGG")
 }
 
 func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
 
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
+	// convert container logs to access logs
+	// go containerDrainSub(ctx, logger)
+
 	defer ctx.Done()
 
 	apiConfig := &shared.ApiConfig{

new


                    

old

new:test.txt

                    

new


                    
3: 7ae45b3 < -: ------- chore: wrap

old

old:auth/api.go new:auth/api.go
 	Host       string `json:"host"`
 	Uri        string `json:"uri"`
 	Headers    struct {
-		UserAgent string `json:"User-Agent"`
-		Referer   string `json:"Referer"`
+		UserAgent []string `json:"User-Agent"`
+		Referer   string   `json:"Referer"`
 	} `json:"headers"`
 	Tls struct {
 		ServerName string `json:"server_name"`
 		Host:      host,
 		Path:      path,
 		IpAddress: access.Request.ClientIP,
-		UserAgent: access.Request.Headers.UserAgent,
+		UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
 		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
 		Status:    access.Status,
 	}, nil
 }
 
-func containerDrainSub(ctx context.Context, logger *slog.Logger) {
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain.  The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to have pipe as a debugging tool which means we
+// can sub to `metric-drain` and have a nice clean output to look use.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	info := shared.NewPicoPipeClient()
 	drain := pipe.NewReconnectReadWriteCloser(
 		ctx,
 		logger,
-		shared.NewPicoPipeClient(),
-		"container logs",
+		info,
+		"container drain",
 		"sub container-drain -k",
 		100,
 		-1,
 	)
 
-	fmt.Println("WTFFFFFF")
-	scanner := bufio.NewScanner(drain)
-	for scanner.Scan() {
-		line := scanner.Text()
-		fmt.Println("HMMMM", line)
-		if strings.Contains(line, "http.log.access") {
-			clean := strings.TrimSpace(line)
-			fmt.Println("LINE", clean)
-			// TODO: send to metric drain
+	send := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"from container drain to metric drain",
+		"pub metric-drain -b=false",
+		100,
+		-1,
+	)
+
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			if strings.Contains(line, "http.log.access") {
+				clean := strings.TrimSpace(line)
+				visit, err := accessLogToVisit(dbpool, clean)
+				if err != nil {
+					logger.Error("could not convert access log to a visit", "err", err)
+					continue
+				}
+				jso, err := json.Marshal(visit)
+				if err != nil {
+					logger.Error("could not marshal json of a visit", "err", err)
+					continue
+				}
+				_, _ = send.Write(jso)
+			}
 		}
 	}
 }
 
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+	accessLog := CaddyAccessLog{}
+	err := json.Unmarshal([]byte(line), &accessLog)
+	if err != nil {
+		return nil, err
+	}
+
+	return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 		-1,
 	)
 
-	scanner := bufio.NewScanner(drain)
-	for scanner.Scan() {
-		line := scanner.Text()
-		accessLog := CaddyAccessLog{}
-		err := json.Unmarshal([]byte(line), &accessLog)
-		if err != nil {
-			logger.Error("json unmarshal", "err", err)
-			continue
-		}
-
-		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
-		if err != nil {
-			logger.Error("cannot deserialize access log", "err", err)
-			continue
-		}
-		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
-		if err != nil {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Info("could not record analytics visit", "reason", err)
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			visit, err := accessLogToVisit(dbpool, line)
+			if err != nil {
+				logger.Error("could not convert access log to a visit", "err", err)
+				continue
+			}
+			err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+			if err != nil {
+				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+					logger.Info("could not record analytics visit", "reason", err)
+				}
 			}
-		}
 
-		logger.Info("inserting visit", "visit", visit)
-		err = dbpool.InsertVisit(visit)
-		if err != nil {
-			logger.Error("could not insert visit record", "err", err)
+			logger.Info("inserting visit", "visit", visit)
+			err = dbpool.InsertVisit(visit)
+			if err != nil {
+				logger.Error("could not insert visit record", "err", err)
+			}
 		}
 	}
-	fmt.Println("DROPINNGGGGGGG")
 }
 
 func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
 	// convert container logs to access logs
-	// go containerDrainSub(ctx, logger)
+	go containerDrainSub(ctx, db, logger)
 
 	defer ctx.Done()
 

new


                    
4: bfa5c4f < -: ------- done

old

old:auth/api.go new:auth/api.go
 	Uri        string `json:"uri"`
 	Headers    struct {
 		UserAgent []string `json:"User-Agent"`
-		Referer   string   `json:"Referer"`
+		Referer   []string `json:"Referer"`
 	} `json:"headers"`
 	Tls struct {
 		ServerName string `json:"server_name"`
 		Path:      path,
 		IpAddress: access.Request.ClientIP,
 		UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
-		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
+		Referer:   strings.Join(access.Request.Headers.Referer, " "),
 		Status:    access.Status,
 	}, nil
 }
 				clean := strings.TrimSpace(line)
 				visit, err := accessLogToVisit(dbpool, clean)
 				if err != nil {
-					logger.Error("could not convert access log to a visit", "err", err)
+					logger.Debug("could not convert access log to a visit", "err", err)
 					continue
 				}
 				jso, err := json.Marshal(visit)
 		scanner := bufio.NewScanner(drain)
 		for scanner.Scan() {
 			line := scanner.Text()
-			visit, err := accessLogToVisit(dbpool, line)
+			visit := db.AnalyticsVisits{}
+			err := json.Unmarshal([]byte(line), &visit)
 			if err != nil {
-				logger.Error("could not convert access log to a visit", "err", err)
+				logger.Info("could not unmarshal json", "err", err, "line", line)
 				continue
 			}
-			err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
 			if err != nil {
 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
 					logger.Info("could not record analytics visit", "reason", err)
 			}
 
 			logger.Info("inserting visit", "visit", visit)
-			err = dbpool.InsertVisit(visit)
+			err = dbpool.InsertVisit(&visit)
 			if err != nil {
 				logger.Error("could not insert visit record", "err", err)
 			}

new