dashboard / pico / reactor(metric-drain): use caddy json format #35 rss

accepted · opened on 2024-11-15T15:03:31Z by erock
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 35
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 35
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 35 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 35
# accept PR
ssh pr.pico.sh pr accept 35
# close PR
ssh pr.pico.sh pr close 35

Logs

erock created pr with ps-74 on 2024-11-15T15:03:31Z
erock added ps-77 on 2024-11-23T03:34:44Z
erock added ps-78 on 2024-11-27T20:17:20Z
erock added ps-79 on 2024-11-27T20:18:31Z
erock changed status on 2024-11-28T03:03:53Z {"status":"accepted"}

Patchsets

ps-74 by erock on 2024-11-15T15:03:31Z
Range Diff ↕
1: 77aaa29 ! 1: 8d56535 reactor(metric-drain): use caddy json format

@@ auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent string `json:"User-Agent"`
+		Referer   string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type CaddyAccessLog struct {
+	Request AccessLogReq `json:"request"`
+	Status  int          `json:"status"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:    user.ID,
+		ProjectID: projectID,
+		PostID:    postID,
+		Namespace: space,
+		Host:      host,
+		Path:      path,
+		IpAddress: access.Request.ClientIP,
+		UserAgent: access.Request.Headers.UserAgent,
+		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
+		Status:    access.Status,
+	}, nil
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	conn := shared.NewPicoPipeClient()
 	stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn)
 	scanner := bufio.NewScanner(stdoutPipe)
 	for scanner.Scan() {
 		line := scanner.Text()
-		visit := db.AnalyticsVisits{}
-		err := json.Unmarshal([]byte(line), &visit)
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 		-1,
 	)
 
-	for {
-		scanner := bufio.NewScanner(drain)
-		for scanner.Scan() {
-			line := scanner.Text()
-			visit := db.AnalyticsVisits{}
-			err := json.Unmarshal([]byte(line), &visit)
-			if err != nil {
-				logger.Error("json unmarshal", "err", err)
-				continue
-			}
-
-			user := slog.Any("userId", visit.UserID)
-
-			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			if err != nil {
-				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
-				}
-			}
+	scanner := bufio.NewScanner(drain)
+	for scanner.Scan() {
+		line := scanner.Text()
+		accessLog := CaddyAccessLog{}
+		err := json.Unmarshal([]byte(line), &accessLog)
 		if err != nil {
 			logger.Error("json unmarshal", "err", err)
 			continue
 		}
+		if err != nil {
+			logger.Error("json unmarshal", "err", err)
+			continue
+		}
 
-		err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			logger.Info("inserting visit", "visit", visit, user)
-			err = dbpool.InsertVisit(&visit)
-			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
+		if err != nil {
+			logger.Error("cannot deserialize access log", "err", err)
+			continue
+		}
+		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
 		if err != nil {
 			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
 				logger.Info("could not record analytics visit", "reason", err)
+		if err != nil {
+			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+				logger.Info("could not record analytics visit", "reason", err)
 			}
 		}
 
 		logger.Info("inserting visit", "visit", visit)
-		err = dbpool.InsertVisit(&visit)
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
+		logger.Info("inserting visit", "visit", visit)
+		err = dbpool.InsertVisit(visit)
 		if err != nil {
 			logger.Error("could not insert visit record", "err", err)
+		if err != nil {
+			logger.Error("could not insert visit record", "err", err)
 		}
 	}
 }

@@ caddy.json
+{
+  "level": "info",
+  "ts": 1731644477.313701,
+  "logger": "http.log.access",
+  "msg": "handled request",
+  "request": {
+    "remote_ip": "127.0.0.1",
+    "remote_port": "40400",
+    "client_ip": "127.0.0.1",
+    "proto": "HTTP/2.0",
+    "method": "GET",
+    "host": "pgs.sh",
+    "uri": "/",
+    "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] },
+    "tls": {
+      "resumed": false,
+      "version": 772,
+      "cipher_suite": 4865,
+      "proto": "h2",
+      "server_name": "pgs.sh"
+    }
+  },
+  "bytes_read": 0,
+  "user_id": "",
+  "duration": 0.001207084,
+  "size": 3718,
+  "status": 200,
+  "resp_headers": {
+    "Referrer-Policy": ["no-referrer-when-downgrade"],
+    "Strict-Transport-Security": ["max-age=31536000;"],
+    "X-Content-Type-Options": ["nosniff"],
+    "X-Frame-Options": ["DENY"],
+    "Server": ["Caddy"],
+    "Alt-Svc": ["h3=\":443\"; ma=2592000"],
+    "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"],
+    "Content-Type": ["text/html; charset=utf-8"],
+    "X-Xss-Protection": ["1; mode=block"],
+    "Permissions-Policy": ["interest-cohort=()"]
+  }
+}

@@ pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)

@@ pgs/web.go
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

@@ shared/api.go
 	"github.com/picosh/utils"
 )
 
+type SubdomainProps struct {
+	ProjectName string
+	Username    string
+}
+
+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
+	props := &SubdomainProps{}
+	strs := strings.SplitN(subdomain, "-", 2)
+	props.Username = strs[0]
+	if len(strs) == 2 {
+		props.ProjectName = strs[1]
+	} else {
+		props.ProjectName = props.Username
+	}
+	return props, nil
+}
+
 func CorsHeaders(headers http.Header) {
 	headers.Add("Access-Control-Allow-Origin", "*")
 	headers.Add("Vary", "Origin")
-: ------- > 2: a336041 wip
-: ------- > 3: 7ae45b3 chore: wrap
-: ------- > 4: bfa5c4f done
ps-77 by erock on 2024-11-23T03:34:44Z
Range Diff ↕
2: a336041 < -: ------- wip
3: 7ae45b3 < -: ------- chore: wrap
4: bfa5c4f < -: ------- done
1: 8d56535 ! 1: c7eeb12 reactor(metric-drain): use caddy access logs

@@ auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/utils"
+	"github.com/picosh/utils/pipe"
 	"github.com/picosh/utils/pipe/metrics"
 )
 
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent string `json:"User-Agent"`
+		Referer   string `json:"Referer"`
+		UserAgent []string `json:"User-Agent"`
+		Referer   []string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type RespHeaders struct {
+	ContentType []string `json:"Content-Type"`
+}
+
+type CaddyAccessLog struct {
+	Request AccessLogReq `json:"request"`
+	Status  int          `json:"status"`
+	Request     AccessLogReq `json:"request"`
+	Status      int          `json:"status"`
+	RespHeaders RespHeaders  `json:"resp_headers"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:    user.ID,
+		ProjectID: projectID,
+		PostID:    postID,
+		Namespace: space,
+		Host:      host,
+		Path:      path,
+		IpAddress: access.Request.ClientIP,
+		UserAgent: access.Request.Headers.UserAgent,
+		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
+		Status:    access.Status,
+		UserID:      user.ID,
+		ProjectID:   projectID,
+		PostID:      postID,
+		Namespace:   space,
+		Host:        host,
+		Path:        path,
+		IpAddress:   access.Request.ClientIP,
+		UserAgent:   strings.Join(access.Request.Headers.UserAgent, " "),
+		Referer:     strings.Join(access.Request.Headers.Referer, " "),
+		ContentType: strings.Join(access.RespHeaders.ContentType, " "),
+		Status:      access.Status,
+	}, nil
+}
+
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain.  The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to use pipe as a debugging tool which means we
+// can manually sub to `metric-drain` and have a nice clean output to view.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	info := shared.NewPicoPipeClient()
+	drain := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"container drain",
+		"sub container-drain -k",
+		100,
+		-1,
+	)
+
+	send := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"from container drain to metric drain",
+		"pub metric-drain -b=false",
+		100,
+		-1,
+	)
+
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			if strings.Contains(line, "http.log.access") {
+				clean := strings.TrimSpace(line)
+				visit, err := accessLogToVisit(dbpool, clean)
+				if err != nil {
+					logger.Debug("could not convert access log to a visit", "err", err)
+					continue
+				}
+				jso, err := json.Marshal(visit)
+				if err != nil {
+					logger.Error("could not marshal json of a visit", "err", err)
+					continue
+				}
+				_, _ = send.Write(jso)
+			}
+		}
+	}
+}
+
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+	accessLog := CaddyAccessLog{}
+	err := json.Unmarshal([]byte(line), &accessLog)
+	if err != nil {
+		return nil, err
+	}
+
+	return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 		-1,
 	)
 
-	for {
-		scanner := bufio.NewScanner(drain)
-		for scanner.Scan() {
-			line := scanner.Text()
-			visit := db.AnalyticsVisits{}
-			err := json.Unmarshal([]byte(line), &visit)
-			if err != nil {
 			visit := db.AnalyticsVisits{}
 			err := json.Unmarshal([]byte(line), &visit)
 			if err != nil {
-				logger.Error("json unmarshal", "err", err)
-				continue
-			}
+				logger.Info("could not unmarshal json", "err", err, "line", line)
 				continue
 			}
-
-			user := slog.Any("userId", visit.UserID)
-
-			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			if err != nil {
-				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
 			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
 			if err != nil {
 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
-				}
-			}
+	scanner := bufio.NewScanner(drain)
+	for scanner.Scan() {
+		line := scanner.Text()
+		accessLog := CaddyAccessLog{}
+		err := json.Unmarshal([]byte(line), &accessLog)
+		if err != nil {
+			logger.Error("json unmarshal", "err", err)
+			continue
+		}
+					logger.Info("could not record analytics visit", "reason", err)
 				}
 			}
 
-			logger.Info("inserting visit", "visit", visit, user)
-			err = dbpool.InsertVisit(&visit)
-			if err != nil {
+			if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
+				continue
+			}
+
+			logger.Info("inserting visit", "visit", visit)
 			err = dbpool.InsertVisit(&visit)
 			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
+		if err != nil {
+			logger.Error("cannot deserialize access log", "err", err)
+			continue
+		}
+		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+		if err != nil {
+			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+				logger.Info("could not record analytics visit", "reason", err)
+				logger.Error("could not insert visit record", "err", err)
 			}
 		}
 
-
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
+		logger.Info("inserting visit", "visit", visit)
+		err = dbpool.InsertVisit(visit)
+		if err != nil {
+			logger.Error("could not insert visit record", "err", err)
 		}
-		}
 	}
 }
 
 
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
+	// convert container logs to access logs
+	go containerDrainSub(ctx, db, logger)
+
 	defer ctx.Done()
 
 	apiConfig := &shared.ApiConfig{

@@ caddy.json
+{
+  "level": "info",
+  "ts": 1731644477.313701,
+  "logger": "http.log.access",
+  "msg": "handled request",
+  "request": {
+    "remote_ip": "127.0.0.1",
+    "remote_port": "40400",
+    "client_ip": "127.0.0.1",
+    "proto": "HTTP/2.0",
+    "method": "GET",
+    "host": "pgs.sh",
+    "uri": "/",
+    "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] },
+    "tls": {
+      "resumed": false,
+      "version": 772,
+      "cipher_suite": 4865,
+      "proto": "h2",
+      "server_name": "pgs.sh"
+    }
+  },
+  "bytes_read": 0,
+  "user_id": "",
+  "duration": 0.001207084,
+  "size": 3718,
+  "status": 200,
+  "resp_headers": {
+    "Referrer-Policy": ["no-referrer-when-downgrade"],
+    "Strict-Transport-Security": ["max-age=31536000;"],
+    "X-Content-Type-Options": ["nosniff"],
+    "X-Frame-Options": ["DENY"],
+    "Server": ["Caddy"],
+    "Alt-Svc": ["h3=\":443\"; ma=2592000"],
+    "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"],
+    "Content-Type": ["text/html; charset=utf-8"],
+    "X-Xss-Protection": ["1; mode=block"],
+    "Permissions-Policy": ["interest-cohort=()"]
+  }
+}

@@ pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)
 			logger,
 			apiConfig.Dbpool,
 			apiConfig.Storage,
-			apiConfig.AnalyticsQueue,
 		)
 		tunnelRouter := TunnelWebRouter{routes}
 		router := http.NewServeMux()

@@ pgs/web.go
 		return
 	}
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
-
-	routes := NewWebRouter(cfg, logger, dbpool, st, ch)
+	routes := NewWebRouter(cfg, logger, dbpool, st)
 
 	portStr := fmt.Sprintf(":%s", cfg.Port)
 	logger.Info(
 type HasPerm = func(proj *db.Project) bool
 
 type WebRouter struct {
-	Cfg            *shared.ConfigSite
-	Logger         *slog.Logger
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
-	RootRouter     *http.ServeMux
-	UserRouter     *http.ServeMux
+	Cfg        *shared.ConfigSite
+	Logger     *slog.Logger
+	Dbpool     db.DB
+	Storage    storage.StorageServe
+	RootRouter *http.ServeMux
+	UserRouter *http.ServeMux
 }
 
-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
 	router := &WebRouter{
-		Cfg:            cfg,
-		Logger:         logger,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: analytics,
+		Cfg:     cfg,
+		Logger:  logger,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	router.initRouters()
 	return router
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

@@ shared/api.go
 	"github.com/picosh/utils"
 )
 
+type SubdomainProps struct {
+	ProjectName string
+	Username    string
+}
+
+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
+	props := &SubdomainProps{}
+	strs := strings.SplitN(subdomain, "-", 2)
+	props.Username = strs[0]
+	if len(strs) == 2 {
+		props.ProjectName = strs[1]
+	} else {
+		props.ProjectName = props.Username
+	}
+	return props, nil
+}
+
 func CorsHeaders(headers http.Header) {
 	headers.Add("Access-Control-Allow-Origin", "*")
 	headers.Add("Vary", "Origin")
ps-78 by erock on 2024-11-27T20:17:20Z
Range Diff ↕
1: c7eeb12 ! 1: 4e0839a reactor(metric-drain): use caddy access logs

@@ Makefile
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
 .PHONY: migrate
 
 latest:
-	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
 .PHONY: latest
 
 psql:

@@ auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/utils"
+	"github.com/picosh/utils/pipe"
 	"github.com/picosh/utils/pipe/metrics"
 )
 
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent []string `json:"User-Agent"`
+		Referer   []string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type RespHeaders struct {
+	ContentType []string `json:"Content-Type"`
+}
+
+type CaddyAccessLog struct {
+	Request     AccessLogReq `json:"request"`
+	Status      int          `json:"status"`
+	RespHeaders RespHeaders  `json:"resp_headers"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:      user.ID,
+		ProjectID:   projectID,
+		PostID:      postID,
+		Namespace:   space,
+		Host:        host,
+		Path:        path,
+		IpAddress:   access.Request.ClientIP,
+		UserAgent:   strings.Join(access.Request.Headers.UserAgent, " "),
+		Referer:     strings.Join(access.Request.Headers.Referer, " "),
+		ContentType: strings.Join(access.RespHeaders.ContentType, " "),
+		Status:      access.Status,
+	}, nil
+}
+
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain.  The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to use pipe as a debugging tool which means we
+// can manually sub to `metric-drain` and have a nice clean output to view.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	info := shared.NewPicoPipeClient()
+	drain := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"container drain",
+		"sub container-drain -k",
+		100,
+		-1,
+	)
+
+	send := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"from container drain to metric drain",
+		"pub metric-drain -b=false",
+		100,
+		-1,
+	)
+
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			if strings.Contains(line, "http.log.access") {
+				clean := strings.TrimSpace(line)
+				visit, err := accessLogToVisit(dbpool, clean)
+				if err != nil {
+					logger.Debug("could not convert access log to a visit", "err", err)
+					continue
+				}
+				jso, err := json.Marshal(visit)
+				if err != nil {
+					logger.Error("could not marshal json of a visit", "err", err)
+					continue
+				}
+				_, _ = send.Write(jso)
+			}
+		}
+	}
+}
+
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+	accessLog := CaddyAccessLog{}
+	err := json.Unmarshal([]byte(line), &accessLog)
+	if err != nil {
+		return nil, err
+	}
+
+	return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 			visit := db.AnalyticsVisits{}
 			err := json.Unmarshal([]byte(line), &visit)
 			if err != nil {
-				logger.Error("json unmarshal", "err", err)
+				logger.Info("could not unmarshal json", "err", err, "line", line)
 				continue
 			}
-
-			user := slog.Any("userId", visit.UserID)
-
 			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
 			if err != nil {
 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
+					logger.Info("could not record analytics visit", "reason", err)
 				}
 			}
 
-			logger.Info("inserting visit", "visit", visit, user)
+			if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
+				continue
+			}
+
+			logger.Info("inserting visit", "visit", visit)
 			err = dbpool.InsertVisit(&visit)
 			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+				logger.Error("could not insert visit record", "err", err)
 			}
 		}
-
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
-		}
 	}
 }
 
 
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
+	// convert container logs to access logs
+	go containerDrainSub(ctx, db, logger)
+
 	defer ctx.Done()
 
 	apiConfig := &shared.ApiConfig{

@@ db/db.go
 }
 
 type AnalyticsVisits struct {
-	ID        string `json:"id"`
-	UserID    string `json:"user_id"`
-	ProjectID string `json:"project_id"`
-	PostID    string `json:"post_id"`
-	Namespace string `json:"namespace"`
-	Host      string `json:"host"`
-	Path      string `json:"path"`
-	IpAddress string `json:"ip_address"`
-	UserAgent string `json:"user_agent"`
-	Referer   string `json:"referer"`
-	Status    int    `json:"status"`
+	ID          string `json:"id"`
+	UserID      string `json:"user_id"`
+	ProjectID   string `json:"project_id"`
+	PostID      string `json:"post_id"`
+	Namespace   string `json:"namespace"`
+	Host        string `json:"host"`
+	Path        string `json:"path"`
+	IpAddress   string `json:"ip_address"`
+	UserAgent   string `json:"user_agent"`
+	Referer     string `json:"referer"`
+	Status      int    `json:"status"`
+	ContentType string `json:"content_type"`
 }
 
 type VisitInterval struct {

@@ db/postgres/storage.go
 
 func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
 	_, err := me.Db.Exec(
-		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
+		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
 		visit.UserID,
 		newNullString(visit.ProjectID),
 		newNullString(visit.PostID),
 		visit.UserAgent,
 		visit.Referer,
 		visit.Status,
+		visit.ContentType,
 	)
 	return err
 }

@@ imgs/api.go
 	dbpool := shared.GetDB(r)
 	logger := shared.GetLogger(r)
 	username := shared.GetUsernameFromRequest(r)
-	analytics := shared.GetAnalyticsQueue(r)
 
 	user, err := dbpool.FindUserForName(username)
 	if err != nil {
 		logger,
 		dbpool,
 		st,
-		analytics,
 	)
 	router.ServeAsset(fname, opts, true, anyPerm, w, r)
 }

@@ pastes/api.go
 	Unlisted     bool
 }
 
-type TransparencyPageData struct {
-	Site      shared.SitePageData
-	Analytics *db.Analytics
-}
-
 type Link struct {
 	URL  string
 	Text string

@@ pgs/ssh.go
 	"github.com/charmbracelet/promwish"
 	"github.com/charmbracelet/ssh"
 	"github.com/charmbracelet/wish"
-	"github.com/picosh/pico/db"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/pico/shared/storage"
 		st,
 	)
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
 	apiConfig := &shared.ApiConfig{
-		Cfg:            cfg,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: ch,
+		Cfg:     cfg,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 
 	webTunnel := &tunkit.WebTunnelHandler{

@@ pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)
 			logger,
 			apiConfig.Dbpool,
 			apiConfig.Storage,
-			apiConfig.AnalyticsQueue,
 		)
 		tunnelRouter := TunnelWebRouter{routes}
 		router := http.NewServeMux()

@@ pgs/web.go
 		return
 	}
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
-
-	routes := NewWebRouter(cfg, logger, dbpool, st, ch)
+	routes := NewWebRouter(cfg, logger, dbpool, st)
 
 	portStr := fmt.Sprintf(":%s", cfg.Port)
 	logger.Info(
 type HasPerm = func(proj *db.Project) bool
 
 type WebRouter struct {
-	Cfg            *shared.ConfigSite
-	Logger         *slog.Logger
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
-	RootRouter     *http.ServeMux
-	UserRouter     *http.ServeMux
+	Cfg        *shared.ConfigSite
+	Logger     *slog.Logger
+	Dbpool     db.DB
+	Storage    storage.StorageServe
+	RootRouter *http.ServeMux
+	UserRouter *http.ServeMux
 }
 
-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
 	router := &WebRouter{
-		Cfg:            cfg,
-		Logger:         logger,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: analytics,
+		Cfg:     cfg,
+		Logger:  logger,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	router.initRouters()
 	return router
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

@@ pgs/web_asset_handler.go
 package pgs
 
 import (
-	"errors"
 	"fmt"
 	"io"
 	"log/slog"
 	"net/http/httputil"
 	_ "net/http/pprof"
 
-	"github.com/picosh/pico/shared"
 	"github.com/picosh/pico/shared/storage"
 	sst "github.com/picosh/pobj/storage"
 )
 			"routes", strings.Join(attempts, ", "),
 			"status", http.StatusNotFound,
 		)
-		// track 404s
-		ch := h.AnalyticsQueue
-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
-		if err == nil {
-			view.ProjectID = h.ProjectID
-			view.Status = http.StatusNotFound
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
 		http.Error(w, "404 not found", http.StatusNotFound)
 		return
 	}
 
 	finContentType := w.Header().Get("content-type")
 
-	// only track pages, not individual assets
-	if finContentType == "text/html" {
-		// track visit
-		ch := h.AnalyticsQueue
-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
-		if err == nil {
-			view.ProjectID = h.ProjectID
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
-	}
-
 	logger.Info(
 		"serving asset",
 		"asset", assetFilepath,

@@ pgs/web_test.go
 	"net/http/httptest"
 	"strings"
 	"testing"
-	"time"
 
 	"github.com/picosh/pico/db"
 	"github.com/picosh/pico/db/stub"
 			responseRecorder := httptest.NewRecorder()
 
 			st, _ := storage.NewStorageMemory(tc.storage)
-			ch := make(chan *db.AnalyticsVisits, 100)
-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
 			router.ServeHTTP(responseRecorder, request)
 
 			if responseRecorder.Code != tc.status {
 	}
 }
 
-func TestAnalytics(t *testing.T) {
-	bucketName := shared.GetAssetBucketName(testUserID)
-	cfg := NewConfigSite()
-	cfg.Domain = "pgs.test"
-	expectedPath := "/app"
-	request := httptest.NewRequest("GET", mkpath(expectedPath), strings.NewReader(""))
-	responseRecorder := httptest.NewRecorder()
-
-	sto := map[string]map[string]string{
-		bucketName: {
-			"test/app.html": "hello world!",
-		},
-	}
-	st, _ := storage.NewStorageMemory(sto)
-	ch := make(chan *db.AnalyticsVisits, 100)
-	dbpool := NewPgsAnalticsDb(cfg.Logger)
-	router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch)
-
-	go func() {
-		for analytics := range ch {
-			if analytics.Path != expectedPath {
-				t.Errorf("Want path '%s', got '%s'", expectedPath, analytics.Path)
-			}
-			close(ch)
-		}
-	}()
-
-	router.ServeHTTP(responseRecorder, request)
-
-	select {
-	case <-ch:
-		return
-	case <-time.After(time.Second * 1):
-		t.Error("didnt receive analytics event within time limit")
-	}
-}
-
 type ImageStorageMemory struct {
 	*storage.StorageMemory
 	Opts  *storage.ImgProcessOpts
 					Ratio: &storage.Ratio{},
 				},
 			}
-			ch := make(chan *db.AnalyticsVisits, 100)
-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
 			router.ServeHTTP(responseRecorder, request)
 
 			if responseRecorder.Code != tc.status {

@@ prose/api.go
 
 import (
 	"bytes"
-	"errors"
 	"fmt"
 	"html/template"
 	"net/http"
 	Diff         template.HTML
 }
 
-type TransparencyPageData struct {
-	Site      shared.SitePageData
-	Analytics *db.Analytics
-}
-
 type HeaderTxt struct {
 	Title      string
 	Bio        string
 		postCollection = append(postCollection, p)
 	}
 
-	// track visit
-	ch := shared.GetAnalyticsQueue(r)
-	view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
-	if err == nil {
-		select {
-		case ch <- view:
-		default:
-			logger.Error("could not send analytics view to channel", "view", view)
-		}
-	} else {
-		if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-			logger.Error("could not record analytics view", "err", err, "view", view)
-		}
-	}
-
 	data := BlogPageData{
 		Site:       *cfg.GetSiteData(),
 		PageTitle:  headerTxt.Title,
 	username := shared.GetUsernameFromRequest(r)
 	subdomain := shared.GetSubdomain(r)
 	cfg := shared.GetCfg(r)
-	ch := shared.GetAnalyticsQueue(r)
 
 	var slug string
 	if !cfg.IsSubdomains() || subdomain == "" {
 			ogImageCard = parsedText.ImageCard
 		}
 
-		// track visit
-		view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
-		if err == nil {
-			view.PostID = post.ID
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
-
 		unlisted := false
 		if post.Hidden || post.PublishAt.After(time.Now()) {
 			unlisted = true
 	mainRoutes := createMainRoutes(staticRoutes)
 	subdomainRoutes := createSubdomainRoutes(staticRoutes)
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
 	apiConfig := &shared.ApiConfig{
-		Cfg:            cfg,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: ch,
+		Cfg:     cfg,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	handler := shared.CreateServe(mainRoutes, subdomainRoutes, apiConfig)
 	router := http.HandlerFunc(handler)

@@ shared/api.go
 	"github.com/picosh/utils"
 )
 
+type SubdomainProps struct {
+	ProjectName string
+	Username    string
+}
+
+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
+	props := &SubdomainProps{}
+	strs := strings.SplitN(subdomain, "-", 2)
+	props.Username = strs[0]
+	if len(strs) == 2 {
+		props.ProjectName = strs[1]
+	} else {
+		props.ProjectName = props.Username
+	}
+	return props, nil
+}
+
 func CorsHeaders(headers http.Header) {
 	headers.Add("Access-Control-Allow-Origin", "*")
 	headers.Add("Vary", "Origin")

@@ shared/router.go
 }
 
 type ApiConfig struct {
-	Cfg            *ConfigSite
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
+	Cfg     *ConfigSite
+	Dbpool  db.DB
+	Storage storage.StorageServe
 }
 
 func (hc *ApiConfig) HasPrivilegedAccess(apiToken string) bool {
 	ctx = context.WithValue(ctx, ctxDBKey{}, hc.Dbpool)
 	ctx = context.WithValue(ctx, ctxStorageKey{}, hc.Storage)
 	ctx = context.WithValue(ctx, ctxCfg{}, hc.Cfg)
-	ctx = context.WithValue(ctx, ctxAnalyticsQueue{}, hc.AnalyticsQueue)
 	return ctx
 }
 
 type ctxStorageKey struct{}
 type ctxLoggerKey struct{}
 type ctxCfg struct{}
-type ctxAnalyticsQueue struct{}
 
 type CtxSubdomainKey struct{}
 type ctxKey struct{}
 	return ""
 }
 
-func GetAnalyticsQueue(r *http.Request) chan *db.AnalyticsVisits {
-	return r.Context().Value(ctxAnalyticsQueue{}).(chan *db.AnalyticsVisits)
-}
-
 func GetApiToken(r *http.Request) string {
 	authHeader := r.Header.Get("authorization")
 	if authHeader == "" {

@@ sql/migrations/20241125_add_content_type_to_analytics.sql
+ALTER TABLE analytics_visits ADD COLUMN content_type varchar(256);
ps-79 by erock on 2024-11-27T20:18:31Z

reactor(metric-drain): use caddy access logs

Previously we were sending site usage analytics within our web app code.
This worked well for our use case because we could filter, parse, and
send the analytics to our pipe `metric-drain` which would then store the
analytics into our database.

Because we want to enable HTTP caching for pgs we won't always reach our
web app code since usage analytics will terminate at our cache layer.

Instead, we want to record analytics higher in the request stack.  In
this case, we want to record site analytics from Caddy access logs.

Here's how it works:

- `pub` caddy access logs to our pipe `container-drain`
- `auth/web` will `sub` to `container-drain`, filter, deserialize, and
  `pub` to `metric-drain`
- `auth/web` will `sub` to `metric-drain` and store the analytics in our
  database
Makefile link
+2 -1
 1diff --git a/Makefile b/Makefile
 2index 1730f2a..923e6c7 100644
 3--- a/Makefile
 4+++ b/Makefile
 5@@ -135,10 +135,11 @@ migrate:
 6 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql
 7 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
 8 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
 9+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
10 .PHONY: migrate
11 
12 latest:
13-	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
14+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
15 .PHONY: latest
16 
17 psql:
auth/api.go link
+162 -12
  1diff --git a/auth/api.go b/auth/api.go
  2index 9c38bdc..8cd165b 100644
  3--- a/auth/api.go
  4+++ b/auth/api.go
  5@@ -14,6 +14,7 @@ import (
  6 	"log/slog"
  7 	"net/http"
  8 	"net/url"
  9+	"strings"
 10 	"time"
 11 
 12 	"github.com/gorilla/feeds"
 13@@ -21,6 +22,7 @@ import (
 14 	"github.com/picosh/pico/db/postgres"
 15 	"github.com/picosh/pico/shared"
 16 	"github.com/picosh/utils"
 17+	"github.com/picosh/utils/pipe"
 18 	"github.com/picosh/utils/pipe/metrics"
 19 )
 20 
 21@@ -578,6 +580,155 @@ func checkoutHandler() http.HandlerFunc {
 22 	}
 23 }
 24 
 25+type AccessLogReq struct {
 26+	RemoteIP   string `json:"remote_ip"`
 27+	RemotePort string `json:"remote_port"`
 28+	ClientIP   string `json:"client_ip"`
 29+	Method     string `json:"method"`
 30+	Host       string `json:"host"`
 31+	Uri        string `json:"uri"`
 32+	Headers    struct {
 33+		UserAgent []string `json:"User-Agent"`
 34+		Referer   []string `json:"Referer"`
 35+	} `json:"headers"`
 36+	Tls struct {
 37+		ServerName string `json:"server_name"`
 38+	} `json:"tls"`
 39+}
 40+
 41+type RespHeaders struct {
 42+	ContentType []string `json:"Content-Type"`
 43+}
 44+
 45+type CaddyAccessLog struct {
 46+	Request     AccessLogReq `json:"request"`
 47+	Status      int          `json:"status"`
 48+	RespHeaders RespHeaders  `json:"resp_headers"`
 49+}
 50+
 51+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
 52+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
 53+	space := spaceRaw[0]
 54+	host := access.Request.Host
 55+	path := access.Request.Uri
 56+	subdomain := ""
 57+
 58+	// grab subdomain based on host
 59+	if strings.HasSuffix(host, "tuns.sh") {
 60+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
 61+	} else if strings.HasSuffix(host, "pgs.sh") {
 62+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
 63+	} else if strings.HasSuffix(host, "prose.sh") {
 64+		subdomain = strings.TrimSuffix(host, ".prose.sh")
 65+	} else {
 66+		subdomain = shared.GetCustomDomain(host, space)
 67+	}
 68+
 69+	// get user and namespace details from subdomain
 70+	props, err := shared.GetProjectFromSubdomain(subdomain)
 71+	if err != nil {
 72+		return nil, err
 73+	}
 74+	// get user ID
 75+	user, err := dbpool.FindUserForName(props.Username)
 76+	if err != nil {
 77+		return nil, err
 78+	}
 79+
 80+	projectID := ""
 81+	postID := ""
 82+	if space == "pgs" { // figure out project ID
 83+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
 84+		if err != nil {
 85+			return nil, err
 86+		}
 87+		projectID = project.ID
 88+	} else if space == "prose" { // figure out post ID
 89+		if path == "" || path == "/" {
 90+		} else {
 91+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
 92+			if err != nil {
 93+				return nil, err
 94+			}
 95+			postID = post.ID
 96+		}
 97+	}
 98+
 99+	return &db.AnalyticsVisits{
100+		UserID:      user.ID,
101+		ProjectID:   projectID,
102+		PostID:      postID,
103+		Namespace:   space,
104+		Host:        host,
105+		Path:        path,
106+		IpAddress:   access.Request.ClientIP,
107+		UserAgent:   strings.Join(access.Request.Headers.UserAgent, " "),
108+		Referer:     strings.Join(access.Request.Headers.Referer, " "),
109+		ContentType: strings.Join(access.RespHeaders.ContentType, " "),
110+		Status:      access.Status,
111+	}, nil
112+}
113+
114+// this feels really stupid because i'm taking containter-drain,
115+// filtering it, and then sending it to metric-drain.  The
116+// metricDrainSub function listens on the metric-drain and saves it.
117+// So why not just call the necessary functions to save the visit?
118+// We want to be able to use pipe as a debugging tool which means we
119+// can manually sub to `metric-drain` and have a nice clean output to view.
120+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
121+	info := shared.NewPicoPipeClient()
122+	drain := pipe.NewReconnectReadWriteCloser(
123+		ctx,
124+		logger,
125+		info,
126+		"container drain",
127+		"sub container-drain -k",
128+		100,
129+		-1,
130+	)
131+
132+	send := pipe.NewReconnectReadWriteCloser(
133+		ctx,
134+		logger,
135+		info,
136+		"from container drain to metric drain",
137+		"pub metric-drain -b=false",
138+		100,
139+		-1,
140+	)
141+
142+	for {
143+		scanner := bufio.NewScanner(drain)
144+		for scanner.Scan() {
145+			line := scanner.Text()
146+			if strings.Contains(line, "http.log.access") {
147+				clean := strings.TrimSpace(line)
148+				visit, err := accessLogToVisit(dbpool, clean)
149+				if err != nil {
150+					logger.Debug("could not convert access log to a visit", "err", err)
151+					continue
152+				}
153+				jso, err := json.Marshal(visit)
154+				if err != nil {
155+					logger.Error("could not marshal json of a visit", "err", err)
156+					continue
157+				}
158+				_, _ = send.Write(jso)
159+			}
160+		}
161+	}
162+}
163+
164+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
165+	accessLog := CaddyAccessLog{}
166+	err := json.Unmarshal([]byte(line), &accessLog)
167+	if err != nil {
168+		return nil, err
169+	}
170+
171+	return deserializeCaddyAccessLog(dbpool, &accessLog)
172+}
173+
174 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
175 	drain := metrics.ReconnectReadMetrics(
176 		ctx,
177@@ -594,30 +745,26 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
178 			visit := db.AnalyticsVisits{}
179 			err := json.Unmarshal([]byte(line), &visit)
180 			if err != nil {
181-				logger.Error("json unmarshal", "err", err)
182+				logger.Info("could not unmarshal json", "err", err, "line", line)
183 				continue
184 			}
185-
186-			user := slog.Any("userId", visit.UserID)
187-
188 			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
189 			if err != nil {
190 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
191-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
192-					continue
193+					logger.Info("could not record analytics visit", "reason", err)
194 				}
195 			}
196 
197-			logger.Info("inserting visit", "visit", visit, user)
198+			if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
199+				continue
200+			}
201+
202+			logger.Info("inserting visit", "visit", visit)
203 			err = dbpool.InsertVisit(&visit)
204 			if err != nil {
205-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
206+				logger.Error("could not insert visit record", "err", err)
207 			}
208 		}
209-
210-		if scanner.Err() != nil {
211-			logger.Error("scanner error", "err", scanner.Err())
212-		}
213 	}
214 }
215 
216@@ -689,6 +836,9 @@ func StartApiServer() {
217 
218 	// gather metrics in the auth service
219 	go metricDrainSub(ctx, db, logger, cfg.Secret)
220+	// convert container logs to access logs
221+	go containerDrainSub(ctx, db, logger)
222+
223 	defer ctx.Done()
224 
225 	apiConfig := &shared.ApiConfig{
db/db.go link
+12 -11
 1diff --git a/db/db.go b/db/db.go
 2index 3e684ef..a3ac860 100644
 3--- a/db/db.go
 4+++ b/db/db.go
 5@@ -161,17 +161,18 @@ type PostAnalytics struct {
 6 }
 7 
 8 type AnalyticsVisits struct {
 9-	ID        string `json:"id"`
10-	UserID    string `json:"user_id"`
11-	ProjectID string `json:"project_id"`
12-	PostID    string `json:"post_id"`
13-	Namespace string `json:"namespace"`
14-	Host      string `json:"host"`
15-	Path      string `json:"path"`
16-	IpAddress string `json:"ip_address"`
17-	UserAgent string `json:"user_agent"`
18-	Referer   string `json:"referer"`
19-	Status    int    `json:"status"`
20+	ID          string `json:"id"`
21+	UserID      string `json:"user_id"`
22+	ProjectID   string `json:"project_id"`
23+	PostID      string `json:"post_id"`
24+	Namespace   string `json:"namespace"`
25+	Host        string `json:"host"`
26+	Path        string `json:"path"`
27+	IpAddress   string `json:"ip_address"`
28+	UserAgent   string `json:"user_agent"`
29+	Referer     string `json:"referer"`
30+	Status      int    `json:"status"`
31+	ContentType string `json:"content_type"`
32 }
33 
34 type VisitInterval struct {
db/postgres/storage.go link
+2 -1
 1diff --git a/db/postgres/storage.go b/db/postgres/storage.go
 2index 66b88e2..4a57e5a 100644
 3--- a/db/postgres/storage.go
 4+++ b/db/postgres/storage.go
 5@@ -986,7 +986,7 @@ func newNullString(s string) sql.NullString {
 6 
 7 func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
 8 	_, err := me.Db.Exec(
 9-		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
10+		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
11 		visit.UserID,
12 		newNullString(visit.ProjectID),
13 		newNullString(visit.PostID),
14@@ -997,6 +997,7 @@ func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
15 		visit.UserAgent,
16 		visit.Referer,
17 		visit.Status,
18+		visit.ContentType,
19 	)
20 	return err
21 }
imgs/api.go link
+0 -2
 1diff --git a/imgs/api.go b/imgs/api.go
 2index 7e9c034..99bd917 100644
 3--- a/imgs/api.go
 4+++ b/imgs/api.go
 5@@ -177,7 +177,6 @@ func ImgRequest(w http.ResponseWriter, r *http.Request) {
 6 	dbpool := shared.GetDB(r)
 7 	logger := shared.GetLogger(r)
 8 	username := shared.GetUsernameFromRequest(r)
 9-	analytics := shared.GetAnalyticsQueue(r)
10 
11 	user, err := dbpool.FindUserForName(username)
12 	if err != nil {
13@@ -241,7 +240,6 @@ func ImgRequest(w http.ResponseWriter, r *http.Request) {
14 		logger,
15 		dbpool,
16 		st,
17-		analytics,
18 	)
19 	router.ServeAsset(fname, opts, true, anyPerm, w, r)
20 }
pastes/api.go link
+0 -5
 1diff --git a/pastes/api.go b/pastes/api.go
 2index 86a230c..8e39b63 100644
 3--- a/pastes/api.go
 4+++ b/pastes/api.go
 5@@ -59,11 +59,6 @@ type PostPageData struct {
 6 	Unlisted     bool
 7 }
 8 
 9-type TransparencyPageData struct {
10-	Site      shared.SitePageData
11-	Analytics *db.Analytics
12-}
13-
14 type Link struct {
15 	URL  string
16 	Text string
pgs/ssh.go link
+3 -7
 1diff --git a/pgs/ssh.go b/pgs/ssh.go
 2index 0e25d50..0f11014 100644
 3--- a/pgs/ssh.go
 4+++ b/pgs/ssh.go
 5@@ -11,7 +11,6 @@ import (
 6 	"github.com/charmbracelet/promwish"
 7 	"github.com/charmbracelet/ssh"
 8 	"github.com/charmbracelet/wish"
 9-	"github.com/picosh/pico/db"
10 	"github.com/picosh/pico/db/postgres"
11 	"github.com/picosh/pico/shared"
12 	"github.com/picosh/pico/shared/storage"
13@@ -81,13 +80,10 @@ func StartSshServer() {
14 		st,
15 	)
16 
17-	ch := make(chan *db.AnalyticsVisits, 100)
18-	go shared.AnalyticsCollect(ch, dbpool, logger)
19 	apiConfig := &shared.ApiConfig{
20-		Cfg:            cfg,
21-		Dbpool:         dbpool,
22-		Storage:        st,
23-		AnalyticsQueue: ch,
24+		Cfg:     cfg,
25+		Dbpool:  dbpool,
26+		Storage: st,
27 	}
28 
29 	webTunnel := &tunkit.WebTunnelHandler{
pgs/tunnel.go link
+1 -2
 1diff --git a/pgs/tunnel.go b/pgs/tunnel.go
 2index b635c8e..34a8bd0 100644
 3--- a/pgs/tunnel.go
 4+++ b/pgs/tunnel.go
 5@@ -51,7 +51,7 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
 6 			"pubkey", pubkeyStr,
 7 		)
 8 
 9-		props, err := getProjectFromSubdomain(subdomain)
10+		props, err := shared.GetProjectFromSubdomain(subdomain)
11 		if err != nil {
12 			log.Error(err.Error())
13 			return http.HandlerFunc(shared.UnauthorizedHandler)
14@@ -121,7 +121,6 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
15 			logger,
16 			apiConfig.Dbpool,
17 			apiConfig.Storage,
18-			apiConfig.AnalyticsQueue,
19 		)
20 		tunnelRouter := TunnelWebRouter{routes}
21 		router := http.NewServeMux()
pgs/web.go link
+14 -36
 1diff --git a/pgs/web.go b/pgs/web.go
 2index 685f6ef..0013ac0 100644
 3--- a/pgs/web.go
 4+++ b/pgs/web.go
 5@@ -40,10 +40,7 @@ func StartApiServer() {
 6 		return
 7 	}
 8 
 9-	ch := make(chan *db.AnalyticsVisits, 100)
10-	go shared.AnalyticsCollect(ch, dbpool, logger)
11-
12-	routes := NewWebRouter(cfg, logger, dbpool, st, ch)
13+	routes := NewWebRouter(cfg, logger, dbpool, st)
14 
15 	portStr := fmt.Sprintf(":%s", cfg.Port)
16 	logger.Info(
17@@ -61,22 +58,20 @@ func StartApiServer() {
18 type HasPerm = func(proj *db.Project) bool
19 
20 type WebRouter struct {
21-	Cfg            *shared.ConfigSite
22-	Logger         *slog.Logger
23-	Dbpool         db.DB
24-	Storage        storage.StorageServe
25-	AnalyticsQueue chan *db.AnalyticsVisits
26-	RootRouter     *http.ServeMux
27-	UserRouter     *http.ServeMux
28+	Cfg        *shared.ConfigSite
29+	Logger     *slog.Logger
30+	Dbpool     db.DB
31+	Storage    storage.StorageServe
32+	RootRouter *http.ServeMux
33+	UserRouter *http.ServeMux
34 }
35 
36-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
37+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
38 	router := &WebRouter{
39-		Cfg:            cfg,
40-		Logger:         logger,
41-		Dbpool:         dbpool,
42-		Storage:        st,
43-		AnalyticsQueue: analytics,
44+		Cfg:     cfg,
45+		Logger:  logger,
46+		Dbpool:  dbpool,
47+		Storage: st,
48 	}
49 	router.initRouters()
50 	return router
51@@ -177,7 +172,7 @@ func (web *WebRouter) checkHandler(w http.ResponseWriter, r *http.Request) {
52 
53 		if !strings.Contains(hostDomain, appDomain) {
54 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
55-			props, err := getProjectFromSubdomain(subdomain)
56+			props, err := shared.GetProjectFromSubdomain(subdomain)
57 			if err != nil {
58 				logger.Error(
59 					"could not get project from subdomain",
60@@ -333,7 +328,7 @@ func (web *WebRouter) ServeAsset(fname string, opts *storage.ImgProcessOpts, fro
61 		"host", r.Host,
62 	)
63 
64-	props, err := getProjectFromSubdomain(subdomain)
65+	props, err := shared.GetProjectFromSubdomain(subdomain)
66 	if err != nil {
67 		logger.Info(
68 			"could not determine project from subdomain",
69@@ -450,20 +445,3 @@ func (web *WebRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
70 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
71 	router.ServeHTTP(w, r.WithContext(ctx))
72 }
73-
74-type SubdomainProps struct {
75-	ProjectName string
76-	Username    string
77-}
78-
79-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
80-	props := &SubdomainProps{}
81-	strs := strings.SplitN(subdomain, "-", 2)
82-	props.Username = strs[0]
83-	if len(strs) == 2 {
84-		props.ProjectName = strs[1]
85-	} else {
86-		props.ProjectName = props.Username
87-	}
88-	return props, nil
89-}
pgs/web_asset_handler.go link
+0 -37
 1diff --git a/pgs/web_asset_handler.go b/pgs/web_asset_handler.go
 2index dd41006..fac47df 100644
 3--- a/pgs/web_asset_handler.go
 4+++ b/pgs/web_asset_handler.go
 5@@ -1,7 +1,6 @@
 6 package pgs
 7 
 8 import (
 9-	"errors"
10 	"fmt"
11 	"io"
12 	"log/slog"
13@@ -15,7 +14,6 @@ import (
14 	"net/http/httputil"
15 	_ "net/http/pprof"
16 
17-	"github.com/picosh/pico/shared"
18 	"github.com/picosh/pico/shared/storage"
19 	sst "github.com/picosh/pobj/storage"
20 )
21@@ -155,22 +153,6 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
22 			"routes", strings.Join(attempts, ", "),
23 			"status", http.StatusNotFound,
24 		)
25-		// track 404s
26-		ch := h.AnalyticsQueue
27-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
28-		if err == nil {
29-			view.ProjectID = h.ProjectID
30-			view.Status = http.StatusNotFound
31-			select {
32-			case ch <- view:
33-			default:
34-				logger.Error("could not send analytics view to channel", "view", view)
35-			}
36-		} else {
37-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
38-				logger.Error("could not record analytics view", "err", err, "view", view)
39-			}
40-		}
41 		http.Error(w, "404 not found", http.StatusNotFound)
42 		return
43 	}
44@@ -236,25 +218,6 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
45 
46 	finContentType := w.Header().Get("content-type")
47 
48-	// only track pages, not individual assets
49-	if finContentType == "text/html" {
50-		// track visit
51-		ch := h.AnalyticsQueue
52-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
53-		if err == nil {
54-			view.ProjectID = h.ProjectID
55-			select {
56-			case ch <- view:
57-			default:
58-				logger.Error("could not send analytics view to channel", "view", view)
59-			}
60-		} else {
61-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
62-				logger.Error("could not record analytics view", "err", err, "view", view)
63-			}
64-		}
65-	}
66-
67 	logger.Info(
68 		"serving asset",
69 		"asset", assetFilepath,
pgs/web_test.go link
+2 -42
 1diff --git a/pgs/web_test.go b/pgs/web_test.go
 2index 3c9c31d..8694d6f 100644
 3--- a/pgs/web_test.go
 4+++ b/pgs/web_test.go
 5@@ -8,7 +8,6 @@ import (
 6 	"net/http/httptest"
 7 	"strings"
 8 	"testing"
 9-	"time"
10 
11 	"github.com/picosh/pico/db"
12 	"github.com/picosh/pico/db/stub"
13@@ -219,8 +218,7 @@ func TestApiBasic(t *testing.T) {
14 			responseRecorder := httptest.NewRecorder()
15 
16 			st, _ := storage.NewStorageMemory(tc.storage)
17-			ch := make(chan *db.AnalyticsVisits, 100)
18-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
19+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
20 			router.ServeHTTP(responseRecorder, request)
21 
22 			if responseRecorder.Code != tc.status {
23@@ -240,43 +238,6 @@ func TestApiBasic(t *testing.T) {
24 	}
25 }
26 
27-func TestAnalytics(t *testing.T) {
28-	bucketName := shared.GetAssetBucketName(testUserID)
29-	cfg := NewConfigSite()
30-	cfg.Domain = "pgs.test"
31-	expectedPath := "/app"
32-	request := httptest.NewRequest("GET", mkpath(expectedPath), strings.NewReader(""))
33-	responseRecorder := httptest.NewRecorder()
34-
35-	sto := map[string]map[string]string{
36-		bucketName: {
37-			"test/app.html": "hello world!",
38-		},
39-	}
40-	st, _ := storage.NewStorageMemory(sto)
41-	ch := make(chan *db.AnalyticsVisits, 100)
42-	dbpool := NewPgsAnalticsDb(cfg.Logger)
43-	router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch)
44-
45-	go func() {
46-		for analytics := range ch {
47-			if analytics.Path != expectedPath {
48-				t.Errorf("Want path '%s', got '%s'", expectedPath, analytics.Path)
49-			}
50-			close(ch)
51-		}
52-	}()
53-
54-	router.ServeHTTP(responseRecorder, request)
55-
56-	select {
57-	case <-ch:
58-		return
59-	case <-time.After(time.Second * 1):
60-		t.Error("didnt receive analytics event within time limit")
61-	}
62-}
63-
64 type ImageStorageMemory struct {
65 	*storage.StorageMemory
66 	Opts  *storage.ImgProcessOpts
67@@ -337,8 +298,7 @@ func TestImageManipulation(t *testing.T) {
68 					Ratio: &storage.Ratio{},
69 				},
70 			}
71-			ch := make(chan *db.AnalyticsVisits, 100)
72-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
73+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
74 			router.ServeHTTP(responseRecorder, request)
75 
76 			if responseRecorder.Code != tc.status {
prose/api.go link
+3 -43
 1diff --git a/prose/api.go b/prose/api.go
 2index 5368f0a..0fd4c5a 100644
 3--- a/prose/api.go
 4+++ b/prose/api.go
 5@@ -2,7 +2,6 @@ package prose
 6 
 7 import (
 8 	"bytes"
 9-	"errors"
10 	"fmt"
11 	"html/template"
12 	"net/http"
13@@ -89,11 +88,6 @@ type PostPageData struct {
14 	Diff         template.HTML
15 }
16 
17-type TransparencyPageData struct {
18-	Site      shared.SitePageData
19-	Analytics *db.Analytics
20-}
21-
22 type HeaderTxt struct {
23 	Title      string
24 	Bio        string
25@@ -270,21 +264,6 @@ func blogHandler(w http.ResponseWriter, r *http.Request) {
26 		postCollection = append(postCollection, p)
27 	}
28 
29-	// track visit
30-	ch := shared.GetAnalyticsQueue(r)
31-	view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
32-	if err == nil {
33-		select {
34-		case ch <- view:
35-		default:
36-			logger.Error("could not send analytics view to channel", "view", view)
37-		}
38-	} else {
39-		if !errors.Is(err, shared.ErrAnalyticsDisabled) {
40-			logger.Error("could not record analytics view", "err", err, "view", view)
41-		}
42-	}
43-
44 	data := BlogPageData{
45 		Site:       *cfg.GetSiteData(),
46 		PageTitle:  headerTxt.Title,
47@@ -350,7 +329,6 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
48 	username := shared.GetUsernameFromRequest(r)
49 	subdomain := shared.GetSubdomain(r)
50 	cfg := shared.GetCfg(r)
51-	ch := shared.GetAnalyticsQueue(r)
52 
53 	var slug string
54 	if !cfg.IsSubdomains() || subdomain == "" {
55@@ -429,21 +407,6 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
56 			ogImageCard = parsedText.ImageCard
57 		}
58 
59-		// track visit
60-		view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
61-		if err == nil {
62-			view.PostID = post.ID
63-			select {
64-			case ch <- view:
65-			default:
66-				logger.Error("could not send analytics view to channel", "view", view)
67-			}
68-		} else {
69-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
70-				logger.Error("could not record analytics view", "err", err, "view", view)
71-			}
72-		}
73-
74 		unlisted := false
75 		if post.Hidden || post.PublishAt.After(time.Now()) {
76 			unlisted = true
77@@ -953,13 +916,10 @@ func StartApiServer() {
78 	mainRoutes := createMainRoutes(staticRoutes)
79 	subdomainRoutes := createSubdomainRoutes(staticRoutes)
80 
81-	ch := make(chan *db.AnalyticsVisits, 100)
82-	go shared.AnalyticsCollect(ch, dbpool, logger)
83 	apiConfig := &shared.ApiConfig{
84-		Cfg:            cfg,
85-		Dbpool:         dbpool,
86-		Storage:        st,
87-		AnalyticsQueue: ch,
88+		Cfg:     cfg,
89+		Dbpool:  dbpool,
90+		Storage: st,
91 	}
92 	handler := shared.CreateServe(mainRoutes, subdomainRoutes, apiConfig)
93 	router := http.HandlerFunc(handler)
shared/api.go link
+17 -0
 1diff --git a/shared/api.go b/shared/api.go
 2index 96a4e68..a33ad59 100644
 3--- a/shared/api.go
 4+++ b/shared/api.go
 5@@ -13,6 +13,23 @@ import (
 6 	"github.com/picosh/utils"
 7 )
 8 
 9+type SubdomainProps struct {
10+	ProjectName string
11+	Username    string
12+}
13+
14+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
15+	props := &SubdomainProps{}
16+	strs := strings.SplitN(subdomain, "-", 2)
17+	props.Username = strs[0]
18+	if len(strs) == 2 {
19+		props.ProjectName = strs[1]
20+	} else {
21+		props.ProjectName = props.Username
22+	}
23+	return props, nil
24+}
25+
26 func CorsHeaders(headers http.Header) {
27 	headers.Add("Access-Control-Allow-Origin", "*")
28 	headers.Add("Vary", "Origin")
shared/router.go link
+3 -10
 1diff --git a/shared/router.go b/shared/router.go
 2index ffb02cc..00573ff 100644
 3--- a/shared/router.go
 4+++ b/shared/router.go
 5@@ -69,10 +69,9 @@ func CreatePProfRoutesMux(mux *http.ServeMux) {
 6 }
 7 
 8 type ApiConfig struct {
 9-	Cfg            *ConfigSite
10-	Dbpool         db.DB
11-	Storage        storage.StorageServe
12-	AnalyticsQueue chan *db.AnalyticsVisits
13+	Cfg     *ConfigSite
14+	Dbpool  db.DB
15+	Storage storage.StorageServe
16 }
17 
18 func (hc *ApiConfig) HasPrivilegedAccess(apiToken string) bool {
19@@ -93,7 +92,6 @@ func (hc *ApiConfig) CreateCtx(prevCtx context.Context, subdomain string) contex
20 	ctx = context.WithValue(ctx, ctxDBKey{}, hc.Dbpool)
21 	ctx = context.WithValue(ctx, ctxStorageKey{}, hc.Storage)
22 	ctx = context.WithValue(ctx, ctxCfg{}, hc.Cfg)
23-	ctx = context.WithValue(ctx, ctxAnalyticsQueue{}, hc.AnalyticsQueue)
24 	return ctx
25 }
26 
27@@ -172,7 +170,6 @@ type ctxDBKey struct{}
28 type ctxStorageKey struct{}
29 type ctxLoggerKey struct{}
30 type ctxCfg struct{}
31-type ctxAnalyticsQueue struct{}
32 
33 type CtxSubdomainKey struct{}
34 type ctxKey struct{}
35@@ -228,10 +225,6 @@ func GetCustomDomain(host string, space string) string {
36 	return ""
37 }
38 
39-func GetAnalyticsQueue(r *http.Request) chan *db.AnalyticsVisits {
40-	return r.Context().Value(ctxAnalyticsQueue{}).(chan *db.AnalyticsVisits)
41-}
42-
43 func GetApiToken(r *http.Request) string {
44 	authHeader := r.Header.Get("authorization")
45 	if authHeader == "" {
sql/migrations/20241125_add_content_type_to_analytics.sql link
+1 -0
1diff --git a/sql/migrations/20241125_add_content_type_to_analytics.sql b/sql/migrations/20241125_add_content_type_to_analytics.sql
2new file mode 100644
3index 0000000..d9b2501
4--- /dev/null
5+++ b/sql/migrations/20241125_add_content_type_to_analytics.sql
6@@ -0,0 +1,1 @@
7+ALTER TABLE analytics_visits ADD COLUMN content_type varchar(256);