dashboard / pico / reactor(metric-drain): use caddy json format #35 rss

accepted · opened on 2024-11-15T15:03:31Z by erock
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 35
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 35
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 35 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 35
# accept PR
ssh pr.pico.sh pr accept 35
# close PR
ssh pr.pico.sh pr close 35

Logs

erock created pr with ps-74 on 2024-11-15T15:03:31Z
erock added ps-77 on 2024-11-23T03:34:44Z
erock added ps-78 on 2024-11-27T20:17:20Z
erock added ps-79 on 2024-11-27T20:18:31Z
erock changed status on 2024-11-28T03:03:53Z {"status":"accepted"}

Patchsets

ps-74 by erock on 2024-11-15T15:03:31Z
Range Diff ↕
1: 77aaa29 ! 1: 8d56535 reactor(metric-drain): use caddy json format

@@ auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent string `json:"User-Agent"`
+		Referer   string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type CaddyAccessLog struct {
+	Request AccessLogReq `json:"request"`
+	Status  int          `json:"status"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:    user.ID,
+		ProjectID: projectID,
+		PostID:    postID,
+		Namespace: space,
+		Host:      host,
+		Path:      path,
+		IpAddress: access.Request.ClientIP,
+		UserAgent: access.Request.Headers.UserAgent,
+		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
+		Status:    access.Status,
+	}, nil
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	conn := shared.NewPicoPipeClient()
 	stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn)
 	scanner := bufio.NewScanner(stdoutPipe)
 	for scanner.Scan() {
 		line := scanner.Text()
-		visit := db.AnalyticsVisits{}
-		err := json.Unmarshal([]byte(line), &visit)
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 		-1,
 	)
 
-	for {
-		scanner := bufio.NewScanner(drain)
-		for scanner.Scan() {
-			line := scanner.Text()
-			visit := db.AnalyticsVisits{}
-			err := json.Unmarshal([]byte(line), &visit)
-			if err != nil {
-				logger.Error("json unmarshal", "err", err)
-				continue
-			}
-
-			user := slog.Any("userId", visit.UserID)
-
-			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			if err != nil {
-				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
-				}
-			}
+	scanner := bufio.NewScanner(drain)
+	for scanner.Scan() {
+		line := scanner.Text()
+		accessLog := CaddyAccessLog{}
+		err := json.Unmarshal([]byte(line), &accessLog)
 		if err != nil {
 			logger.Error("json unmarshal", "err", err)
 			continue
 		}
+		if err != nil {
+			logger.Error("json unmarshal", "err", err)
+			continue
+		}
 
-		err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			logger.Info("inserting visit", "visit", visit, user)
-			err = dbpool.InsertVisit(&visit)
-			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
+		if err != nil {
+			logger.Error("cannot deserialize access log", "err", err)
+			continue
+		}
+		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
 		if err != nil {
 			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
 				logger.Info("could not record analytics visit", "reason", err)
+		if err != nil {
+			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+				logger.Info("could not record analytics visit", "reason", err)
 			}
 		}
 
 		logger.Info("inserting visit", "visit", visit)
-		err = dbpool.InsertVisit(&visit)
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
+		logger.Info("inserting visit", "visit", visit)
+		err = dbpool.InsertVisit(visit)
 		if err != nil {
 			logger.Error("could not insert visit record", "err", err)
+		if err != nil {
+			logger.Error("could not insert visit record", "err", err)
 		}
 	}
 }

@@ caddy.json
+{
+  "level": "info",
+  "ts": 1731644477.313701,
+  "logger": "http.log.access",
+  "msg": "handled request",
+  "request": {
+    "remote_ip": "127.0.0.1",
+    "remote_port": "40400",
+    "client_ip": "127.0.0.1",
+    "proto": "HTTP/2.0",
+    "method": "GET",
+    "host": "pgs.sh",
+    "uri": "/",
+    "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] },
+    "tls": {
+      "resumed": false,
+      "version": 772,
+      "cipher_suite": 4865,
+      "proto": "h2",
+      "server_name": "pgs.sh"
+    }
+  },
+  "bytes_read": 0,
+  "user_id": "",
+  "duration": 0.001207084,
+  "size": 3718,
+  "status": 200,
+  "resp_headers": {
+    "Referrer-Policy": ["no-referrer-when-downgrade"],
+    "Strict-Transport-Security": ["max-age=31536000;"],
+    "X-Content-Type-Options": ["nosniff"],
+    "X-Frame-Options": ["DENY"],
+    "Server": ["Caddy"],
+    "Alt-Svc": ["h3=\":443\"; ma=2592000"],
+    "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"],
+    "Content-Type": ["text/html; charset=utf-8"],
+    "X-Xss-Protection": ["1; mode=block"],
+    "Permissions-Policy": ["interest-cohort=()"]
+  }
+}

@@ pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)

@@ pgs/web.go
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

@@ shared/api.go
 	"github.com/picosh/utils"
 )
 
+type SubdomainProps struct {
+	ProjectName string
+	Username    string
+}
+
+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
+	props := &SubdomainProps{}
+	strs := strings.SplitN(subdomain, "-", 2)
+	props.Username = strs[0]
+	if len(strs) == 2 {
+		props.ProjectName = strs[1]
+	} else {
+		props.ProjectName = props.Username
+	}
+	return props, nil
+}
+
 func CorsHeaders(headers http.Header) {
 	headers.Add("Access-Control-Allow-Origin", "*")
 	headers.Add("Vary", "Origin")
-: ------- > 2: a336041 wip
-: ------- > 3: 7ae45b3 chore: wrap
-: ------- > 4: bfa5c4f done
ps-77 by erock on 2024-11-23T03:34:44Z
Range Diff ↕
2: a336041 < -: ------- wip
3: 7ae45b3 < -: ------- chore: wrap
4: bfa5c4f < -: ------- done
1: 8d56535 ! 1: c7eeb12 reactor(metric-drain): use caddy access logs

@@ auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/utils"
+	"github.com/picosh/utils/pipe"
 	"github.com/picosh/utils/pipe/metrics"
 )
 
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent string `json:"User-Agent"`
+		Referer   string `json:"Referer"`
+		UserAgent []string `json:"User-Agent"`
+		Referer   []string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type RespHeaders struct {
+	ContentType []string `json:"Content-Type"`
+}
+
+type CaddyAccessLog struct {
+	Request AccessLogReq `json:"request"`
+	Status  int          `json:"status"`
+	Request     AccessLogReq `json:"request"`
+	Status      int          `json:"status"`
+	RespHeaders RespHeaders  `json:"resp_headers"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:    user.ID,
+		ProjectID: projectID,
+		PostID:    postID,
+		Namespace: space,
+		Host:      host,
+		Path:      path,
+		IpAddress: access.Request.ClientIP,
+		UserAgent: access.Request.Headers.UserAgent,
+		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
+		Status:    access.Status,
+		UserID:      user.ID,
+		ProjectID:   projectID,
+		PostID:      postID,
+		Namespace:   space,
+		Host:        host,
+		Path:        path,
+		IpAddress:   access.Request.ClientIP,
+		UserAgent:   strings.Join(access.Request.Headers.UserAgent, " "),
+		Referer:     strings.Join(access.Request.Headers.Referer, " "),
+		ContentType: strings.Join(access.RespHeaders.ContentType, " "),
+		Status:      access.Status,
+	}, nil
+}
+
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain.  The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to use pipe as a debugging tool which means we
+// can manually sub to `metric-drain` and have a nice clean output to view.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	info := shared.NewPicoPipeClient()
+	drain := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"container drain",
+		"sub container-drain -k",
+		100,
+		-1,
+	)
+
+	send := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"from container drain to metric drain",
+		"pub metric-drain -b=false",
+		100,
+		-1,
+	)
+
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			if strings.Contains(line, "http.log.access") {
+				clean := strings.TrimSpace(line)
+				visit, err := accessLogToVisit(dbpool, clean)
+				if err != nil {
+					logger.Debug("could not convert access log to a visit", "err", err)
+					continue
+				}
+				jso, err := json.Marshal(visit)
+				if err != nil {
+					logger.Error("could not marshal json of a visit", "err", err)
+					continue
+				}
+				_, _ = send.Write(jso)
+			}
+		}
+	}
+}
+
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+	accessLog := CaddyAccessLog{}
+	err := json.Unmarshal([]byte(line), &accessLog)
+	if err != nil {
+		return nil, err
+	}
+
+	return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 		-1,
 	)
 
-	for {
-		scanner := bufio.NewScanner(drain)
-		for scanner.Scan() {
-			line := scanner.Text()
-			visit := db.AnalyticsVisits{}
-			err := json.Unmarshal([]byte(line), &visit)
-			if err != nil {
 			visit := db.AnalyticsVisits{}
 			err := json.Unmarshal([]byte(line), &visit)
 			if err != nil {
-				logger.Error("json unmarshal", "err", err)
-				continue
-			}
+				logger.Info("could not unmarshal json", "err", err, "line", line)
 				continue
 			}
-
-			user := slog.Any("userId", visit.UserID)
-
-			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			if err != nil {
-				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
 			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
 			if err != nil {
 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
-				}
-			}
+	scanner := bufio.NewScanner(drain)
+	for scanner.Scan() {
+		line := scanner.Text()
+		accessLog := CaddyAccessLog{}
+		err := json.Unmarshal([]byte(line), &accessLog)
+		if err != nil {
+			logger.Error("json unmarshal", "err", err)
+			continue
+		}
+					logger.Info("could not record analytics visit", "reason", err)
 				}
 			}
 
-			logger.Info("inserting visit", "visit", visit, user)
-			err = dbpool.InsertVisit(&visit)
-			if err != nil {
+			if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
+				continue
+			}
+
+			logger.Info("inserting visit", "visit", visit)
 			err = dbpool.InsertVisit(&visit)
 			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
+		if err != nil {
+			logger.Error("cannot deserialize access log", "err", err)
+			continue
+		}
+		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+		if err != nil {
+			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+				logger.Info("could not record analytics visit", "reason", err)
+				logger.Error("could not insert visit record", "err", err)
 			}
 		}
 
-
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
+		logger.Info("inserting visit", "visit", visit)
+		err = dbpool.InsertVisit(visit)
+		if err != nil {
+			logger.Error("could not insert visit record", "err", err)
 		}
-		}
 	}
 }
 
 
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
+	// convert container logs to access logs
+	go containerDrainSub(ctx, db, logger)
+
 	defer ctx.Done()
 
 	apiConfig := &shared.ApiConfig{

@@ caddy.json
+{
+  "level": "info",
+  "ts": 1731644477.313701,
+  "logger": "http.log.access",
+  "msg": "handled request",
+  "request": {
+    "remote_ip": "127.0.0.1",
+    "remote_port": "40400",
+    "client_ip": "127.0.0.1",
+    "proto": "HTTP/2.0",
+    "method": "GET",
+    "host": "pgs.sh",
+    "uri": "/",
+    "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] },
+    "tls": {
+      "resumed": false,
+      "version": 772,
+      "cipher_suite": 4865,
+      "proto": "h2",
+      "server_name": "pgs.sh"
+    }
+  },
+  "bytes_read": 0,
+  "user_id": "",
+  "duration": 0.001207084,
+  "size": 3718,
+  "status": 200,
+  "resp_headers": {
+    "Referrer-Policy": ["no-referrer-when-downgrade"],
+    "Strict-Transport-Security": ["max-age=31536000;"],
+    "X-Content-Type-Options": ["nosniff"],
+    "X-Frame-Options": ["DENY"],
+    "Server": ["Caddy"],
+    "Alt-Svc": ["h3=\":443\"; ma=2592000"],
+    "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"],
+    "Content-Type": ["text/html; charset=utf-8"],
+    "X-Xss-Protection": ["1; mode=block"],
+    "Permissions-Policy": ["interest-cohort=()"]
+  }
+}

@@ pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)
 			logger,
 			apiConfig.Dbpool,
 			apiConfig.Storage,
-			apiConfig.AnalyticsQueue,
 		)
 		tunnelRouter := TunnelWebRouter{routes}
 		router := http.NewServeMux()

@@ pgs/web.go
 		return
 	}
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
-
-	routes := NewWebRouter(cfg, logger, dbpool, st, ch)
+	routes := NewWebRouter(cfg, logger, dbpool, st)
 
 	portStr := fmt.Sprintf(":%s", cfg.Port)
 	logger.Info(
 type HasPerm = func(proj *db.Project) bool
 
 type WebRouter struct {
-	Cfg            *shared.ConfigSite
-	Logger         *slog.Logger
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
-	RootRouter     *http.ServeMux
-	UserRouter     *http.ServeMux
+	Cfg        *shared.ConfigSite
+	Logger     *slog.Logger
+	Dbpool     db.DB
+	Storage    storage.StorageServe
+	RootRouter *http.ServeMux
+	UserRouter *http.ServeMux
 }
 
-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
 	router := &WebRouter{
-		Cfg:            cfg,
-		Logger:         logger,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: analytics,
+		Cfg:     cfg,
+		Logger:  logger,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	router.initRouters()
 	return router
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

@@ shared/api.go
 	"github.com/picosh/utils"
 )
 
+type SubdomainProps struct {
+	ProjectName string
+	Username    string
+}
+
+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
+	props := &SubdomainProps{}
+	strs := strings.SplitN(subdomain, "-", 2)
+	props.Username = strs[0]
+	if len(strs) == 2 {
+		props.ProjectName = strs[1]
+	} else {
+		props.ProjectName = props.Username
+	}
+	return props, nil
+}
+
 func CorsHeaders(headers http.Header) {
 	headers.Add("Access-Control-Allow-Origin", "*")
 	headers.Add("Vary", "Origin")
ps-78 by erock on 2024-11-27T20:17:20Z
Range Diff ↕
1: c7eeb12 ! 1: 4e0839a reactor(metric-drain): use caddy access logs

@@ Makefile
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
 .PHONY: migrate
 
 latest:
-	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241125_add_content_type_to_analytics.sql
 .PHONY: latest
 
 psql:

@@ auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/utils"
+	"github.com/picosh/utils/pipe"
 	"github.com/picosh/utils/pipe/metrics"
 )
 
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent []string `json:"User-Agent"`
+		Referer   []string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type RespHeaders struct {
+	ContentType []string `json:"Content-Type"`
+}
+
+type CaddyAccessLog struct {
+	Request     AccessLogReq `json:"request"`
+	Status      int          `json:"status"`
+	RespHeaders RespHeaders  `json:"resp_headers"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:      user.ID,
+		ProjectID:   projectID,
+		PostID:      postID,
+		Namespace:   space,
+		Host:        host,
+		Path:        path,
+		IpAddress:   access.Request.ClientIP,
+		UserAgent:   strings.Join(access.Request.Headers.UserAgent, " "),
+		Referer:     strings.Join(access.Request.Headers.Referer, " "),
+		ContentType: strings.Join(access.RespHeaders.ContentType, " "),
+		Status:      access.Status,
+	}, nil
+}
+
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain.  The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to use pipe as a debugging tool which means we
+// can manually sub to `metric-drain` and have a nice clean output to view.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	info := shared.NewPicoPipeClient()
+	drain := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"container drain",
+		"sub container-drain -k",
+		100,
+		-1,
+	)
+
+	send := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"from container drain to metric drain",
+		"pub metric-drain -b=false",
+		100,
+		-1,
+	)
+
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			if strings.Contains(line, "http.log.access") {
+				clean := strings.TrimSpace(line)
+				visit, err := accessLogToVisit(dbpool, clean)
+				if err != nil {
+					logger.Debug("could not convert access log to a visit", "err", err)
+					continue
+				}
+				jso, err := json.Marshal(visit)
+				if err != nil {
+					logger.Error("could not marshal json of a visit", "err", err)
+					continue
+				}
+				_, _ = send.Write(jso)
+			}
+		}
+	}
+}
+
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+	accessLog := CaddyAccessLog{}
+	err := json.Unmarshal([]byte(line), &accessLog)
+	if err != nil {
+		return nil, err
+	}
+
+	return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 			visit := db.AnalyticsVisits{}
 			err := json.Unmarshal([]byte(line), &visit)
 			if err != nil {
-				logger.Error("json unmarshal", "err", err)
+				logger.Info("could not unmarshal json", "err", err, "line", line)
 				continue
 			}
-
-			user := slog.Any("userId", visit.UserID)
-
 			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
 			if err != nil {
 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
+					logger.Info("could not record analytics visit", "reason", err)
 				}
 			}
 
-			logger.Info("inserting visit", "visit", visit, user)
+			if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
+				continue
+			}
+
+			logger.Info("inserting visit", "visit", visit)
 			err = dbpool.InsertVisit(&visit)
 			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+				logger.Error("could not insert visit record", "err", err)
 			}
 		}
-
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
-		}
 	}
 }
 
 
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
+	// convert container logs to access logs
+	go containerDrainSub(ctx, db, logger)
+
 	defer ctx.Done()
 
 	apiConfig := &shared.ApiConfig{

@@ db/db.go
 }
 
 type AnalyticsVisits struct {
-	ID        string `json:"id"`
-	UserID    string `json:"user_id"`
-	ProjectID string `json:"project_id"`
-	PostID    string `json:"post_id"`
-	Namespace string `json:"namespace"`
-	Host      string `json:"host"`
-	Path      string `json:"path"`
-	IpAddress string `json:"ip_address"`
-	UserAgent string `json:"user_agent"`
-	Referer   string `json:"referer"`
-	Status    int    `json:"status"`
+	ID          string `json:"id"`
+	UserID      string `json:"user_id"`
+	ProjectID   string `json:"project_id"`
+	PostID      string `json:"post_id"`
+	Namespace   string `json:"namespace"`
+	Host        string `json:"host"`
+	Path        string `json:"path"`
+	IpAddress   string `json:"ip_address"`
+	UserAgent   string `json:"user_agent"`
+	Referer     string `json:"referer"`
+	Status      int    `json:"status"`
+	ContentType string `json:"content_type"`
 }
 
 type VisitInterval struct {

@@ db/postgres/storage.go
 
 func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
 	_, err := me.Db.Exec(
-		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
+		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
 		visit.UserID,
 		newNullString(visit.ProjectID),
 		newNullString(visit.PostID),
 		visit.UserAgent,
 		visit.Referer,
 		visit.Status,
+		visit.ContentType,
 	)
 	return err
 }

@@ imgs/api.go
 	dbpool := shared.GetDB(r)
 	logger := shared.GetLogger(r)
 	username := shared.GetUsernameFromRequest(r)
-	analytics := shared.GetAnalyticsQueue(r)
 
 	user, err := dbpool.FindUserForName(username)
 	if err != nil {
 		logger,
 		dbpool,
 		st,
-		analytics,
 	)
 	router.ServeAsset(fname, opts, true, anyPerm, w, r)
 }

@@ pastes/api.go
 	Unlisted     bool
 }
 
-type TransparencyPageData struct {
-	Site      shared.SitePageData
-	Analytics *db.Analytics
-}
-
 type Link struct {
 	URL  string
 	Text string

@@ pgs/ssh.go
 	"github.com/charmbracelet/promwish"
 	"github.com/charmbracelet/ssh"
 	"github.com/charmbracelet/wish"
-	"github.com/picosh/pico/db"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/pico/shared/storage"
 		st,
 	)
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
 	apiConfig := &shared.ApiConfig{
-		Cfg:            cfg,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: ch,
+		Cfg:     cfg,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 
 	webTunnel := &tunkit.WebTunnelHandler{

@@ pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)
 			logger,
 			apiConfig.Dbpool,
 			apiConfig.Storage,
-			apiConfig.AnalyticsQueue,
 		)
 		tunnelRouter := TunnelWebRouter{routes}
 		router := http.NewServeMux()

@@ pgs/web.go
 		return
 	}
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
-
-	routes := NewWebRouter(cfg, logger, dbpool, st, ch)
+	routes := NewWebRouter(cfg, logger, dbpool, st)
 
 	portStr := fmt.Sprintf(":%s", cfg.Port)
 	logger.Info(
 type HasPerm = func(proj *db.Project) bool
 
 type WebRouter struct {
-	Cfg            *shared.ConfigSite
-	Logger         *slog.Logger
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
-	RootRouter     *http.ServeMux
-	UserRouter     *http.ServeMux
+	Cfg        *shared.ConfigSite
+	Logger     *slog.Logger
+	Dbpool     db.DB
+	Storage    storage.StorageServe
+	RootRouter *http.ServeMux
+	UserRouter *http.ServeMux
 }
 
-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
 	router := &WebRouter{
-		Cfg:            cfg,
-		Logger:         logger,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: analytics,
+		Cfg:     cfg,
+		Logger:  logger,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	router.initRouters()
 	return router
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}

@@ pgs/web_asset_handler.go
 package pgs
 
 import (
-	"errors"
 	"fmt"
 	"io"
 	"log/slog"
 	"net/http/httputil"
 	_ "net/http/pprof"
 
-	"github.com/picosh/pico/shared"
 	"github.com/picosh/pico/shared/storage"
 	sst "github.com/picosh/pobj/storage"
 )
 			"routes", strings.Join(attempts, ", "),
 			"status", http.StatusNotFound,
 		)
-		// track 404s
-		ch := h.AnalyticsQueue
-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
-		if err == nil {
-			view.ProjectID = h.ProjectID
-			view.Status = http.StatusNotFound
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
 		http.Error(w, "404 not found", http.StatusNotFound)
 		return
 	}
 
 	finContentType := w.Header().Get("content-type")
 
-	// only track pages, not individual assets
-	if finContentType == "text/html" {
-		// track visit
-		ch := h.AnalyticsQueue
-		view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
-		if err == nil {
-			view.ProjectID = h.ProjectID
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
-	}
-
 	logger.Info(
 		"serving asset",
 		"asset", assetFilepath,

@@ pgs/web_test.go
 	"net/http/httptest"
 	"strings"
 	"testing"
-	"time"
 
 	"github.com/picosh/pico/db"
 	"github.com/picosh/pico/db/stub"
 			responseRecorder := httptest.NewRecorder()
 
 			st, _ := storage.NewStorageMemory(tc.storage)
-			ch := make(chan *db.AnalyticsVisits, 100)
-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
 			router.ServeHTTP(responseRecorder, request)
 
 			if responseRecorder.Code != tc.status {
 	}
 }
 
-func TestAnalytics(t *testing.T) {
-	bucketName := shared.GetAssetBucketName(testUserID)
-	cfg := NewConfigSite()
-	cfg.Domain = "pgs.test"
-	expectedPath := "/app"
-	request := httptest.NewRequest("GET", mkpath(expectedPath), strings.NewReader(""))
-	responseRecorder := httptest.NewRecorder()
-
-	sto := map[string]map[string]string{
-		bucketName: {
-			"test/app.html": "hello world!",
-		},
-	}
-	st, _ := storage.NewStorageMemory(sto)
-	ch := make(chan *db.AnalyticsVisits, 100)
-	dbpool := NewPgsAnalticsDb(cfg.Logger)
-	router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch)
-
-	go func() {
-		for analytics := range ch {
-			if analytics.Path != expectedPath {
-				t.Errorf("Want path '%s', got '%s'", expectedPath, analytics.Path)
-			}
-			close(ch)
-		}
-	}()
-
-	router.ServeHTTP(responseRecorder, request)
-
-	select {
-	case <-ch:
-		return
-	case <-time.After(time.Second * 1):
-		t.Error("didnt receive analytics event within time limit")
-	}
-}
-
 type ImageStorageMemory struct {
 	*storage.StorageMemory
 	Opts  *storage.ImgProcessOpts
 					Ratio: &storage.Ratio{},
 				},
 			}
-			ch := make(chan *db.AnalyticsVisits, 100)
-			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
+			router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st)
 			router.ServeHTTP(responseRecorder, request)
 
 			if responseRecorder.Code != tc.status {

@@ prose/api.go
 
 import (
 	"bytes"
-	"errors"
 	"fmt"
 	"html/template"
 	"net/http"
 	Diff         template.HTML
 }
 
-type TransparencyPageData struct {
-	Site      shared.SitePageData
-	Analytics *db.Analytics
-}
-
 type HeaderTxt struct {
 	Title      string
 	Bio        string
 		postCollection = append(postCollection, p)
 	}
 
-	// track visit
-	ch := shared.GetAnalyticsQueue(r)
-	view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
-	if err == nil {
-		select {
-		case ch <- view:
-		default:
-			logger.Error("could not send analytics view to channel", "view", view)
-		}
-	} else {
-		if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-			logger.Error("could not record analytics view", "err", err, "view", view)
-		}
-	}
-
 	data := BlogPageData{
 		Site:       *cfg.GetSiteData(),
 		PageTitle:  headerTxt.Title,
 	username := shared.GetUsernameFromRequest(r)
 	subdomain := shared.GetSubdomain(r)
 	cfg := shared.GetCfg(r)
-	ch := shared.GetAnalyticsQueue(r)
 
 	var slug string
 	if !cfg.IsSubdomains() || subdomain == "" {
 			ogImageCard = parsedText.ImageCard
 		}
 
-		// track visit
-		view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
-		if err == nil {
-			view.PostID = post.ID
-			select {
-			case ch <- view:
-			default:
-				logger.Error("could not send analytics view to channel", "view", view)
-			}
-		} else {
-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-				logger.Error("could not record analytics view", "err", err, "view", view)
-			}
-		}
-
 		unlisted := false
 		if post.Hidden || post.PublishAt.After(time.Now()) {
 			unlisted = true
 	mainRoutes := createMainRoutes(staticRoutes)
 	subdomainRoutes := createSubdomainRoutes(staticRoutes)
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
 	apiConfig := &shared.ApiConfig{
-		Cfg:            cfg,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: ch,
+		Cfg:     cfg,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	handler := shared.CreateServe(mainRoutes, subdomainRoutes, apiConfig)
 	router := http.HandlerFunc(handler)

@@ shared/api.go
 	"github.com/picosh/utils"
 )
 
+type SubdomainProps struct {
+	ProjectName string
+	Username    string
+}
+
+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
+	props := &SubdomainProps{}
+	strs := strings.SplitN(subdomain, "-", 2)
+	props.Username = strs[0]
+	if len(strs) == 2 {
+		props.ProjectName = strs[1]
+	} else {
+		props.ProjectName = props.Username
+	}
+	return props, nil
+}
+
 func CorsHeaders(headers http.Header) {
 	headers.Add("Access-Control-Allow-Origin", "*")
 	headers.Add("Vary", "Origin")

@@ shared/router.go
 }
 
 type ApiConfig struct {
-	Cfg            *ConfigSite
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
+	Cfg     *ConfigSite
+	Dbpool  db.DB
+	Storage storage.StorageServe
 }
 
 func (hc *ApiConfig) HasPrivilegedAccess(apiToken string) bool {
 	ctx = context.WithValue(ctx, ctxDBKey{}, hc.Dbpool)
 	ctx = context.WithValue(ctx, ctxStorageKey{}, hc.Storage)
 	ctx = context.WithValue(ctx, ctxCfg{}, hc.Cfg)
-	ctx = context.WithValue(ctx, ctxAnalyticsQueue{}, hc.AnalyticsQueue)
 	return ctx
 }
 
 type ctxStorageKey struct{}
 type ctxLoggerKey struct{}
 type ctxCfg struct{}
-type ctxAnalyticsQueue struct{}
 
 type CtxSubdomainKey struct{}
 type ctxKey struct{}
 	return ""
 }
 
-func GetAnalyticsQueue(r *http.Request) chan *db.AnalyticsVisits {
-	return r.Context().Value(ctxAnalyticsQueue{}).(chan *db.AnalyticsVisits)
-}
-
 func GetApiToken(r *http.Request) string {
 	authHeader := r.Header.Get("authorization")
 	if authHeader == "" {

@@ sql/migrations/20241125_add_content_type_to_analytics.sql
+ALTER TABLE analytics_visits ADD COLUMN content_type varchar(256);
ps-79 by erock on 2024-11-27T20:18:31Z

Patchset ps-77

reactor(metric-drain): use caddy json format

Eric Bower
2024-11-15T15:02:24Z
auth/api.go
+106 -26
caddy.json
+40 -0
pgs/web.go
+2 -19
shared/api.go
+17 -0

wip

Eric Bower
2024-11-23T02:06:47Z
auth/api.go
+29 -0
test.txt
+0 -0

chore: wrap

Eric Bower
2024-11-23T02:41:15Z
auth/api.go
+71 -41

done

Eric Bower
2024-11-23T03:32:31Z
auth/api.go
+8 -7
Back to top

reactor(metric-drain): use caddy json format

auth/api.go link
+106 -26
  1diff --git a/auth/api.go b/auth/api.go
  2index 9c38bdc..8840af7 100644
  3--- a/auth/api.go
  4+++ b/auth/api.go
  5@@ -14,6 +14,7 @@ import (
  6 	"log/slog"
  7 	"net/http"
  8 	"net/url"
  9+	"strings"
 10 	"time"
 11 
 12 	"github.com/gorilla/feeds"
 13@@ -578,6 +579,89 @@ func checkoutHandler() http.HandlerFunc {
 14 	}
 15 }
 16 
 17+type AccessLogReq struct {
 18+	RemoteIP   string `json:"remote_ip"`
 19+	RemotePort string `json:"remote_port"`
 20+	ClientIP   string `json:"client_ip"`
 21+	Method     string `json:"method"`
 22+	Host       string `json:"host"`
 23+	Uri        string `json:"uri"`
 24+	Headers    struct {
 25+		UserAgent string `json:"User-Agent"`
 26+		Referer   string `json:"Referer"`
 27+	} `json:"headers"`
 28+	Tls struct {
 29+		ServerName string `json:"server_name"`
 30+	} `json:"tls"`
 31+}
 32+
 33+type CaddyAccessLog struct {
 34+	Request AccessLogReq `json:"request"`
 35+	Status  int          `json:"status"`
 36+}
 37+
 38+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
 39+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
 40+	space := spaceRaw[0]
 41+	host := access.Request.Host
 42+	path := access.Request.Uri
 43+	subdomain := ""
 44+
 45+	// grab subdomain based on host
 46+	if strings.HasSuffix(host, "tuns.sh") {
 47+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
 48+	} else if strings.HasSuffix(host, "pgs.sh") {
 49+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
 50+	} else if strings.HasSuffix(host, "prose.sh") {
 51+		subdomain = strings.TrimSuffix(host, ".prose.sh")
 52+	} else {
 53+		subdomain = shared.GetCustomDomain(host, space)
 54+	}
 55+
 56+	// get user and namespace details from subdomain
 57+	props, err := shared.GetProjectFromSubdomain(subdomain)
 58+	if err != nil {
 59+		return nil, err
 60+	}
 61+	// get user ID
 62+	user, err := dbpool.FindUserForName(props.Username)
 63+	if err != nil {
 64+		return nil, err
 65+	}
 66+
 67+	projectID := ""
 68+	postID := ""
 69+	if space == "pgs" { // figure out project ID
 70+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
 71+		if err != nil {
 72+			return nil, err
 73+		}
 74+		projectID = project.ID
 75+	} else if space == "prose" { // figure out post ID
 76+		if path == "" || path == "/" {
 77+		} else {
 78+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
 79+			if err != nil {
 80+				return nil, err
 81+			}
 82+			postID = post.ID
 83+		}
 84+	}
 85+
 86+	return &db.AnalyticsVisits{
 87+		UserID:    user.ID,
 88+		ProjectID: projectID,
 89+		PostID:    postID,
 90+		Namespace: space,
 91+		Host:      host,
 92+		Path:      path,
 93+		IpAddress: access.Request.ClientIP,
 94+		UserAgent: access.Request.Headers.UserAgent,
 95+		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
 96+		Status:    access.Status,
 97+	}, nil
 98+}
 99+
100 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
101 	drain := metrics.ReconnectReadMetrics(
102 		ctx,
103@@ -587,36 +671,32 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
104 		-1,
105 	)
106 
107-	for {
108-		scanner := bufio.NewScanner(drain)
109-		for scanner.Scan() {
110-			line := scanner.Text()
111-			visit := db.AnalyticsVisits{}
112-			err := json.Unmarshal([]byte(line), &visit)
113-			if err != nil {
114-				logger.Error("json unmarshal", "err", err)
115-				continue
116-			}
117-
118-			user := slog.Any("userId", visit.UserID)
119-
120-			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
121-			if err != nil {
122-				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
123-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
124-					continue
125-				}
126-			}
127+	scanner := bufio.NewScanner(drain)
128+	for scanner.Scan() {
129+		line := scanner.Text()
130+		accessLog := CaddyAccessLog{}
131+		err := json.Unmarshal([]byte(line), &accessLog)
132+		if err != nil {
133+			logger.Error("json unmarshal", "err", err)
134+			continue
135+		}
136 
137-			logger.Info("inserting visit", "visit", visit, user)
138-			err = dbpool.InsertVisit(&visit)
139-			if err != nil {
140-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
141+		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
142+		if err != nil {
143+			logger.Error("cannot deserialize access log", "err", err)
144+			continue
145+		}
146+		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
147+		if err != nil {
148+			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
149+				logger.Info("could not record analytics visit", "reason", err)
150 			}
151 		}
152 
153-		if scanner.Err() != nil {
154-			logger.Error("scanner error", "err", scanner.Err())
155+		logger.Info("inserting visit", "visit", visit)
156+		err = dbpool.InsertVisit(visit)
157+		if err != nil {
158+			logger.Error("could not insert visit record", "err", err)
159 		}
160 	}
161 }
caddy.json link
+40 -0
 1diff --git a/caddy.json b/caddy.json
 2new file mode 100644
 3index 0000000..49e80ec
 4--- /dev/null
 5+++ b/caddy.json
 6@@ -0,0 +1,40 @@
 7+{
 8+  "level": "info",
 9+  "ts": 1731644477.313701,
10+  "logger": "http.log.access",
11+  "msg": "handled request",
12+  "request": {
13+    "remote_ip": "127.0.0.1",
14+    "remote_port": "40400",
15+    "client_ip": "127.0.0.1",
16+    "proto": "HTTP/2.0",
17+    "method": "GET",
18+    "host": "pgs.sh",
19+    "uri": "/",
20+    "headers": { "User-Agent": ["Blackbox Exporter/0.24.0"] },
21+    "tls": {
22+      "resumed": false,
23+      "version": 772,
24+      "cipher_suite": 4865,
25+      "proto": "h2",
26+      "server_name": "pgs.sh"
27+    }
28+  },
29+  "bytes_read": 0,
30+  "user_id": "",
31+  "duration": 0.001207084,
32+  "size": 3718,
33+  "status": 200,
34+  "resp_headers": {
35+    "Referrer-Policy": ["no-referrer-when-downgrade"],
36+    "Strict-Transport-Security": ["max-age=31536000;"],
37+    "X-Content-Type-Options": ["nosniff"],
38+    "X-Frame-Options": ["DENY"],
39+    "Server": ["Caddy"],
40+    "Alt-Svc": ["h3=\":443\"; ma=2592000"],
41+    "Date": ["Fri, 15 Nov 2024 04:21:17 GMT"],
42+    "Content-Type": ["text/html; charset=utf-8"],
43+    "X-Xss-Protection": ["1; mode=block"],
44+    "Permissions-Policy": ["interest-cohort=()"]
45+  }
46+}
pgs/tunnel.go link
+1 -1
 1diff --git a/pgs/tunnel.go b/pgs/tunnel.go
 2index b635c8e..accacc5 100644
 3--- a/pgs/tunnel.go
 4+++ b/pgs/tunnel.go
 5@@ -51,7 +51,7 @@ func createHttpHandler(apiConfig *shared.ApiConfig) CtxHttpBridge {
 6 			"pubkey", pubkeyStr,
 7 		)
 8 
 9-		props, err := getProjectFromSubdomain(subdomain)
10+		props, err := shared.GetProjectFromSubdomain(subdomain)
11 		if err != nil {
12 			log.Error(err.Error())
13 			return http.HandlerFunc(shared.UnauthorizedHandler)
pgs/web.go link
+2 -19
 1diff --git a/pgs/web.go b/pgs/web.go
 2index 685f6ef..0903c1a 100644
 3--- a/pgs/web.go
 4+++ b/pgs/web.go
 5@@ -177,7 +177,7 @@ func (web *WebRouter) checkHandler(w http.ResponseWriter, r *http.Request) {
 6 
 7 		if !strings.Contains(hostDomain, appDomain) {
 8 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
 9-			props, err := getProjectFromSubdomain(subdomain)
10+			props, err := shared.GetProjectFromSubdomain(subdomain)
11 			if err != nil {
12 				logger.Error(
13 					"could not get project from subdomain",
14@@ -333,7 +333,7 @@ func (web *WebRouter) ServeAsset(fname string, opts *storage.ImgProcessOpts, fro
15 		"host", r.Host,
16 	)
17 
18-	props, err := getProjectFromSubdomain(subdomain)
19+	props, err := shared.GetProjectFromSubdomain(subdomain)
20 	if err != nil {
21 		logger.Info(
22 			"could not determine project from subdomain",
23@@ -450,20 +450,3 @@ func (web *WebRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
24 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
25 	router.ServeHTTP(w, r.WithContext(ctx))
26 }
27-
28-type SubdomainProps struct {
29-	ProjectName string
30-	Username    string
31-}
32-
33-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
34-	props := &SubdomainProps{}
35-	strs := strings.SplitN(subdomain, "-", 2)
36-	props.Username = strs[0]
37-	if len(strs) == 2 {
38-		props.ProjectName = strs[1]
39-	} else {
40-		props.ProjectName = props.Username
41-	}
42-	return props, nil
43-}
shared/api.go link
+17 -0
 1diff --git a/shared/api.go b/shared/api.go
 2index 96a4e68..a33ad59 100644
 3--- a/shared/api.go
 4+++ b/shared/api.go
 5@@ -13,6 +13,23 @@ import (
 6 	"github.com/picosh/utils"
 7 )
 8 
 9+type SubdomainProps struct {
10+	ProjectName string
11+	Username    string
12+}
13+
14+func GetProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
15+	props := &SubdomainProps{}
16+	strs := strings.SplitN(subdomain, "-", 2)
17+	props.Username = strs[0]
18+	if len(strs) == 2 {
19+		props.ProjectName = strs[1]
20+	} else {
21+		props.ProjectName = props.Username
22+	}
23+	return props, nil
24+}
25+
26 func CorsHeaders(headers http.Header) {
27 	headers.Add("Access-Control-Allow-Origin", "*")
28 	headers.Add("Vary", "Origin")

wip

auth/api.go link
+29 -0
 1diff --git a/auth/api.go b/auth/api.go
 2index 8840af7..6d997ab 100644
 3--- a/auth/api.go
 4+++ b/auth/api.go
 5@@ -22,6 +22,7 @@ import (
 6 	"github.com/picosh/pico/db/postgres"
 7 	"github.com/picosh/pico/shared"
 8 	"github.com/picosh/utils"
 9+	"github.com/picosh/utils/pipe"
10 	"github.com/picosh/utils/pipe/metrics"
11 )
12 
13@@ -662,6 +663,30 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
14 	}, nil
15 }
16 
17+func containerDrainSub(ctx context.Context, logger *slog.Logger) {
18+	drain := pipe.NewReconnectReadWriteCloser(
19+		ctx,
20+		logger,
21+		shared.NewPicoPipeClient(),
22+		"container logs",
23+		"sub container-drain -k",
24+		100,
25+		-1,
26+	)
27+
28+	fmt.Println("WTFFFFFF")
29+	scanner := bufio.NewScanner(drain)
30+	for scanner.Scan() {
31+		line := scanner.Text()
32+		fmt.Println("HMMMM", line)
33+		if strings.Contains(line, "http.log.access") {
34+			clean := strings.TrimSpace(line)
35+			fmt.Println("LINE", clean)
36+			// TODO: send to metric drain
37+		}
38+	}
39+}
40+
41 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
42 	drain := metrics.ReconnectReadMetrics(
43 		ctx,
44@@ -699,6 +724,7 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
45 			logger.Error("could not insert visit record", "err", err)
46 		}
47 	}
48+	fmt.Println("DROPINNGGGGGGG")
49 }
50 
51 func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
52@@ -769,6 +795,9 @@ func StartApiServer() {
53 
54 	// gather metrics in the auth service
55 	go metricDrainSub(ctx, db, logger, cfg.Secret)
56+	// convert container logs to access logs
57+	// go containerDrainSub(ctx, logger)
58+
59 	defer ctx.Done()
60 
61 	apiConfig := &shared.ApiConfig{
test.txt link
+0 -0
 1diff --git a/test.txt b/test.txt
 2new file mode 100644
 3index 0000000000000000000000000000000000000000..9670dcf3682d2391c3391099ac7c64000ffa22ef
 4GIT binary patch
 5literal 1060
 6zc$|Dw!EWO=5G{(H`VBo0bLuW6nv`r=k<ddbq}!m$BG{;#0!a`s<X9#^lPXEuNP_%(
 7zK{>&u?Y24*Z{8ce;XHl&P5Qq3;Ry`x&_Dy)t{h|#12lYD7A32T`GT^l=108Dz_?*R
 8z4-Kqi9I0^w6;fgdkijMl2^UrwRK(+-TMQ90cs(^w;Bn(3-suq<;8j8SqNbdcw5liG
 9zR2-d;&_GemiwZK3Mx%}YAsM}k4jTBi?=NPY^5g>J+9_z@!$}VrtX;Yp_WL~*JM?0}
10zFd7{Lm2LwWY`umCX3}qF6zwhvP$={vx&0&m#reJP3ROBma}CRy@<mDcoYA5v;5LI}
11zDRGSXiIq-iJ0#NZsK-?5R(a>FMH#gn^3(C_4Z2nSNj#)ljn%#6RdFO#fvY2)Umag1
12z##<j3hl#JJ_y2N#Lf+5bzw|pDrQ6!*r>-A<enCI$pyOeTryB@zjT^jy=4NVj1J4uf
13z#jmr8_8a1u9~-^Hsitgx=G-)fG;XU_MtE$Aac&Gh@kIGOpwEv7w3DH8_o$5#vpHqt
14zN9i`Bi2B}OOU^G(Pe@H!ORlt4ZXd|Uo>Y{#r1x5(jY-my|JPUl&@Yny@-u%&T5@^T
15zbwx|)$CI9PXP-zx8C|etv3!G-bU{nDEROz$u~+OIl@_E!8_|=C?r>cq&ME6!a%xxZ
16zLc}#KThfbrgE5f`%HF~4^K%CG4-vziN1>o$8EgakJDp+)8K|unhbLx~F~<0^o@WsB
17zBTn29Mcf8xN>-<zB8+!$GI{2tcwJB0TWDspuZH_erVne4Xk*?#j0ny3Z3dU<cQ|FC
18V6G}b&d-u|~fN7TR{|W#A|NpuqJn;Yk
19
20literal 0
21Kc$@(M0RR6000031
22

chore: wrap

auth/api.go link
+71 -41
  1diff --git a/auth/api.go b/auth/api.go
  2index 6d997ab..c25e2e7 100644
  3--- a/auth/api.go
  4+++ b/auth/api.go
  5@@ -588,8 +588,8 @@ type AccessLogReq struct {
  6 	Host       string `json:"host"`
  7 	Uri        string `json:"uri"`
  8 	Headers    struct {
  9-		UserAgent string `json:"User-Agent"`
 10-		Referer   string `json:"Referer"`
 11+		UserAgent []string `json:"User-Agent"`
 12+		Referer   string   `json:"Referer"`
 13 	} `json:"headers"`
 14 	Tls struct {
 15 		ServerName string `json:"server_name"`
 16@@ -657,36 +657,72 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
 17 		Host:      host,
 18 		Path:      path,
 19 		IpAddress: access.Request.ClientIP,
 20-		UserAgent: access.Request.Headers.UserAgent,
 21+		UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
 22 		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
 23 		Status:    access.Status,
 24 	}, nil
 25 }
 26 
 27-func containerDrainSub(ctx context.Context, logger *slog.Logger) {
 28+// this feels really stupid because i'm taking containter-drain,
 29+// filtering it, and then sending it to metric-drain.  The
 30+// metricDrainSub function listens on the metric-drain and saves it.
 31+// So why not just call the necessary functions to save the visit?
 32+// We want to be able to have pipe as a debugging tool which means we
 33+// can sub to `metric-drain` and have a nice clean output to look use.
 34+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
 35+	info := shared.NewPicoPipeClient()
 36 	drain := pipe.NewReconnectReadWriteCloser(
 37 		ctx,
 38 		logger,
 39-		shared.NewPicoPipeClient(),
 40-		"container logs",
 41+		info,
 42+		"container drain",
 43 		"sub container-drain -k",
 44 		100,
 45 		-1,
 46 	)
 47 
 48-	fmt.Println("WTFFFFFF")
 49-	scanner := bufio.NewScanner(drain)
 50-	for scanner.Scan() {
 51-		line := scanner.Text()
 52-		fmt.Println("HMMMM", line)
 53-		if strings.Contains(line, "http.log.access") {
 54-			clean := strings.TrimSpace(line)
 55-			fmt.Println("LINE", clean)
 56-			// TODO: send to metric drain
 57+	send := pipe.NewReconnectReadWriteCloser(
 58+		ctx,
 59+		logger,
 60+		info,
 61+		"from container drain to metric drain",
 62+		"pub metric-drain -b=false",
 63+		100,
 64+		-1,
 65+	)
 66+
 67+	for {
 68+		scanner := bufio.NewScanner(drain)
 69+		for scanner.Scan() {
 70+			line := scanner.Text()
 71+			if strings.Contains(line, "http.log.access") {
 72+				clean := strings.TrimSpace(line)
 73+				visit, err := accessLogToVisit(dbpool, clean)
 74+				if err != nil {
 75+					logger.Error("could not convert access log to a visit", "err", err)
 76+					continue
 77+				}
 78+				jso, err := json.Marshal(visit)
 79+				if err != nil {
 80+					logger.Error("could not marshal json of a visit", "err", err)
 81+					continue
 82+				}
 83+				_, _ = send.Write(jso)
 84+			}
 85 		}
 86 	}
 87 }
 88 
 89+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
 90+	accessLog := CaddyAccessLog{}
 91+	err := json.Unmarshal([]byte(line), &accessLog)
 92+	if err != nil {
 93+		return nil, err
 94+	}
 95+
 96+	return deserializeCaddyAccessLog(dbpool, &accessLog)
 97+}
 98+
 99 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
100 	drain := metrics.ReconnectReadMetrics(
101 		ctx,
102@@ -696,35 +732,29 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
103 		-1,
104 	)
105 
106-	scanner := bufio.NewScanner(drain)
107-	for scanner.Scan() {
108-		line := scanner.Text()
109-		accessLog := CaddyAccessLog{}
110-		err := json.Unmarshal([]byte(line), &accessLog)
111-		if err != nil {
112-			logger.Error("json unmarshal", "err", err)
113-			continue
114-		}
115-
116-		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
117-		if err != nil {
118-			logger.Error("cannot deserialize access log", "err", err)
119-			continue
120-		}
121-		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
122-		if err != nil {
123-			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
124-				logger.Info("could not record analytics visit", "reason", err)
125+	for {
126+		scanner := bufio.NewScanner(drain)
127+		for scanner.Scan() {
128+			line := scanner.Text()
129+			visit, err := accessLogToVisit(dbpool, line)
130+			if err != nil {
131+				logger.Error("could not convert access log to a visit", "err", err)
132+				continue
133+			}
134+			err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
135+			if err != nil {
136+				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
137+					logger.Info("could not record analytics visit", "reason", err)
138+				}
139 			}
140-		}
141 
142-		logger.Info("inserting visit", "visit", visit)
143-		err = dbpool.InsertVisit(visit)
144-		if err != nil {
145-			logger.Error("could not insert visit record", "err", err)
146+			logger.Info("inserting visit", "visit", visit)
147+			err = dbpool.InsertVisit(visit)
148+			if err != nil {
149+				logger.Error("could not insert visit record", "err", err)
150+			}
151 		}
152 	}
153-	fmt.Println("DROPINNGGGGGGG")
154 }
155 
156 func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
157@@ -796,7 +826,7 @@ func StartApiServer() {
158 	// gather metrics in the auth service
159 	go metricDrainSub(ctx, db, logger, cfg.Secret)
160 	// convert container logs to access logs
161-	// go containerDrainSub(ctx, logger)
162+	go containerDrainSub(ctx, db, logger)
163 
164 	defer ctx.Done()
165 

done

auth/api.go link
+8 -7
 1diff --git a/auth/api.go b/auth/api.go
 2index c25e2e7..09d7661 100644
 3--- a/auth/api.go
 4+++ b/auth/api.go
 5@@ -589,7 +589,7 @@ type AccessLogReq struct {
 6 	Uri        string `json:"uri"`
 7 	Headers    struct {
 8 		UserAgent []string `json:"User-Agent"`
 9-		Referer   string   `json:"Referer"`
10+		Referer   []string `json:"Referer"`
11 	} `json:"headers"`
12 	Tls struct {
13 		ServerName string `json:"server_name"`
14@@ -658,7 +658,7 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
15 		Path:      path,
16 		IpAddress: access.Request.ClientIP,
17 		UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
18-		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
19+		Referer:   strings.Join(access.Request.Headers.Referer, " "),
20 		Status:    access.Status,
21 	}, nil
22 }
23@@ -699,7 +699,7 @@ func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
24 				clean := strings.TrimSpace(line)
25 				visit, err := accessLogToVisit(dbpool, clean)
26 				if err != nil {
27-					logger.Error("could not convert access log to a visit", "err", err)
28+					logger.Debug("could not convert access log to a visit", "err", err)
29 					continue
30 				}
31 				jso, err := json.Marshal(visit)
32@@ -736,12 +736,13 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
33 		scanner := bufio.NewScanner(drain)
34 		for scanner.Scan() {
35 			line := scanner.Text()
36-			visit, err := accessLogToVisit(dbpool, line)
37+			visit := db.AnalyticsVisits{}
38+			err := json.Unmarshal([]byte(line), &visit)
39 			if err != nil {
40-				logger.Error("could not convert access log to a visit", "err", err)
41+				logger.Info("could not unmarshal json", "err", err, "line", line)
42 				continue
43 			}
44-			err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
45+			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
46 			if err != nil {
47 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
48 					logger.Info("could not record analytics visit", "reason", err)
49@@ -749,7 +750,7 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
50 			}
51 
52 			logger.Info("inserting visit", "visit", visit)
53-			err = dbpool.InsertVisit(visit)
54+			err = dbpool.InsertVisit(&visit)
55 			if err != nil {
56 				logger.Error("could not insert visit record", "err", err)
57 			}