dashboard / pico / reactor(metric-drain): use caddy json format #35 rss

accepted · opened on 2024-11-15T15:03:31Z by erock
Help
# add changes to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add 35
# add review to patch request
git format-patch main --stdout | ssh pr.pico.sh pr add --review 35
# remove patchset
ssh pr.pico.sh ps rm ps-x
# checkout all patches
ssh pr.pico.sh pr print 35 | git am -3
# print a diff between the last two patches in a patch request
ssh pr.pico.sh pr diff 35
# accept PR
ssh pr.pico.sh pr accept 35
# close PR
ssh pr.pico.sh pr close 35

Logs

erock created pr with ps-74 on 2024-11-15T15:03:31Z
erock added ps-77 on 2024-11-23T03:34:44Z
erock added ps-78 on 2024-11-27T20:17:20Z
erock added ps-79 on 2024-11-27T20:18:31Z
erock changed status on 2024-11-28T03:03:53Z {"status":"accepted"}

Patchsets

ps-74 by erock on 2024-11-15T15:03:31Z
Range Diff ↕ rd-77
1: 77aaa29 ! 1: 8d56535 reactor(metric-drain): use caddy json format
-: ------- > 2: a336041 wip
-: ------- > 3: 7ae45b3 chore: wrap
-: ------- > 4: bfa5c4f done
ps-77 by erock on 2024-11-23T03:34:44Z
Range Diff ↕ rd-78
1: 8d56535 ! 1: c7eeb12 reactor(metric-drain): use caddy access logs
2: a336041 < -: ------- wip
3: 7ae45b3 < -: ------- chore: wrap
4: bfa5c4f < -: ------- done
ps-78 by erock on 2024-11-27T20:17:20Z
Range Diff ↕ rd-79
1: c7eeb12 ! 1: 4e0839a reactor(metric-drain): use caddy access logs
ps-79 by erock on 2024-11-27T20:18:31Z

Range-diff rd-78

title
reactor(metric-drain): use caddy access logs
description
Patch changed
old #1
8d56535
new #1
c7eeb12
title
wip
description
Patch removed
old #2
a336041
new #0
(none)
title
chore: wrap
description
Patch removed
old #3
7ae45b3
new #0
(none)
title
done
description
Patch removed
old #4
bfa5c4f
new #0
(none)
Back to top
1: 8d56535 ! 1: c7eeb12 reactor(metric-drain): use caddy access logs
auth/api.go auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent string `json:"User-Agent"`
+		Referer   string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type CaddyAccessLog struct {
+	Request AccessLogReq `json:"request"`
+	Status  int          `json:"status"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:    user.ID,
+		ProjectID: projectID,
+		PostID:    postID,
+		Namespace: space,
+		Host:      host,
+		Path:      path,
+		IpAddress: access.Request.ClientIP,
+		UserAgent: access.Request.Headers.UserAgent,
+		Referer:   access.Request.Headers.Referer, // TODO: I don't see referer in the access log
+		Status:    access.Status,
+	}, nil
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 		-1,
 	)
 
-	for {
-		scanner := bufio.NewScanner(drain)
-		for scanner.Scan() {
-			line := scanner.Text()
-			visit := db.AnalyticsVisits{}
-			err := json.Unmarshal([]byte(line), &visit)
-			if err != nil {
-				logger.Error("json unmarshal", "err", err)
-				continue
-			}
-
-			user := slog.Any("userId", visit.UserID)
-
-			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
-			if err != nil {
-				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
-				}
-			}
+	scanner := bufio.NewScanner(drain)
+	for scanner.Scan() {
+		line := scanner.Text()
+		accessLog := CaddyAccessLog{}
+		err := json.Unmarshal([]byte(line), &accessLog)
+		if err != nil {
+			logger.Error("json unmarshal", "err", err)
+			continue
+		}
 
-			logger.Info("inserting visit", "visit", visit, user)
-			err = dbpool.InsertVisit(&visit)
-			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+		visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
+		if err != nil {
+			logger.Error("cannot deserialize access log", "err", err)
+			continue
+		}
+		err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+		if err != nil {
+			if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+				logger.Info("could not record analytics visit", "reason", err)
 			}
 		}
 
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
+		logger.Info("inserting visit", "visit", visit)
+		err = dbpool.InsertVisit(visit)
+		if err != nil {
+			logger.Error("could not insert visit record", "err", err)
 		}
 	}
 }
auth/api.go auth/api.go
 	"log/slog"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 
 	"github.com/gorilla/feeds"
 	"github.com/picosh/pico/db/postgres"
 	"github.com/picosh/pico/shared"
 	"github.com/picosh/utils"
+	"github.com/picosh/utils/pipe"
 	"github.com/picosh/utils/pipe/metrics"
 )
 
 	}
 }
 
+type AccessLogReq struct {
+	RemoteIP   string `json:"remote_ip"`
+	RemotePort string `json:"remote_port"`
+	ClientIP   string `json:"client_ip"`
+	Method     string `json:"method"`
+	Host       string `json:"host"`
+	Uri        string `json:"uri"`
+	Headers    struct {
+		UserAgent []string `json:"User-Agent"`
+		Referer   []string `json:"Referer"`
+	} `json:"headers"`
+	Tls struct {
+		ServerName string `json:"server_name"`
+	} `json:"tls"`
+}
+
+type RespHeaders struct {
+	ContentType []string `json:"Content-Type"`
+}
+
+type CaddyAccessLog struct {
+	Request     AccessLogReq `json:"request"`
+	Status      int          `json:"status"`
+	RespHeaders RespHeaders  `json:"resp_headers"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+	spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+	space := spaceRaw[0]
+	host := access.Request.Host
+	path := access.Request.Uri
+	subdomain := ""
+
+	// grab subdomain based on host
+	if strings.HasSuffix(host, "tuns.sh") {
+		subdomain = strings.TrimSuffix(host, ".tuns.sh")
+	} else if strings.HasSuffix(host, "pgs.sh") {
+		subdomain = strings.TrimSuffix(host, ".pgs.sh")
+	} else if strings.HasSuffix(host, "prose.sh") {
+		subdomain = strings.TrimSuffix(host, ".prose.sh")
+	} else {
+		subdomain = shared.GetCustomDomain(host, space)
+	}
+
+	// get user and namespace details from subdomain
+	props, err := shared.GetProjectFromSubdomain(subdomain)
+	if err != nil {
+		return nil, err
+	}
+	// get user ID
+	user, err := dbpool.FindUserForName(props.Username)
+	if err != nil {
+		return nil, err
+	}
+
+	projectID := ""
+	postID := ""
+	if space == "pgs" { // figure out project ID
+		project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+		if err != nil {
+			return nil, err
+		}
+		projectID = project.ID
+	} else if space == "prose" { // figure out post ID
+		if path == "" || path == "/" {
+		} else {
+			post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+			if err != nil {
+				return nil, err
+			}
+			postID = post.ID
+		}
+	}
+
+	return &db.AnalyticsVisits{
+		UserID:      user.ID,
+		ProjectID:   projectID,
+		PostID:      postID,
+		Namespace:   space,
+		Host:        host,
+		Path:        path,
+		IpAddress:   access.Request.ClientIP,
+		UserAgent:   strings.Join(access.Request.Headers.UserAgent, " "),
+		Referer:     strings.Join(access.Request.Headers.Referer, " "),
+		ContentType: strings.Join(access.RespHeaders.ContentType, " "),
+		Status:      access.Status,
+	}, nil
+}
+
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain.  The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to use pipe as a debugging tool which means we
+// can manually sub to `metric-drain` and have a nice clean output to view.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+	info := shared.NewPicoPipeClient()
+	drain := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"container drain",
+		"sub container-drain -k",
+		100,
+		-1,
+	)
+
+	send := pipe.NewReconnectReadWriteCloser(
+		ctx,
+		logger,
+		info,
+		"from container drain to metric drain",
+		"pub metric-drain -b=false",
+		100,
+		-1,
+	)
+
+	for {
+		scanner := bufio.NewScanner(drain)
+		for scanner.Scan() {
+			line := scanner.Text()
+			if strings.Contains(line, "http.log.access") {
+				clean := strings.TrimSpace(line)
+				visit, err := accessLogToVisit(dbpool, clean)
+				if err != nil {
+					logger.Debug("could not convert access log to a visit", "err", err)
+					continue
+				}
+				jso, err := json.Marshal(visit)
+				if err != nil {
+					logger.Error("could not marshal json of a visit", "err", err)
+					continue
+				}
+				_, _ = send.Write(jso)
+			}
+		}
+	}
+}
+
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+	accessLog := CaddyAccessLog{}
+	err := json.Unmarshal([]byte(line), &accessLog)
+	if err != nil {
+		return nil, err
+	}
+
+	return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
 	drain := metrics.ReconnectReadMetrics(
 		ctx,
 			visit := db.AnalyticsVisits{}
 			err := json.Unmarshal([]byte(line), &visit)
 			if err != nil {
-				logger.Error("json unmarshal", "err", err)
+				logger.Info("could not unmarshal json", "err", err, "line", line)
 				continue
 			}
-
-			user := slog.Any("userId", visit.UserID)
-
 			err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
 			if err != nil {
 				if !errors.Is(err, shared.ErrAnalyticsDisabled) {
-					logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
-					continue
+					logger.Info("could not record analytics visit", "reason", err)
 				}
 			}
 
-			logger.Info("inserting visit", "visit", visit, user)
+			if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
+				continue
+			}
+
+			logger.Info("inserting visit", "visit", visit)
 			err = dbpool.InsertVisit(&visit)
 			if err != nil {
-				logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+				logger.Error("could not insert visit record", "err", err)
 			}
 		}
-
-		if scanner.Err() != nil {
-			logger.Error("scanner error", "err", scanner.Err())
-		}
 	}
 }
 
 
 	// gather metrics in the auth service
 	go metricDrainSub(ctx, db, logger, cfg.Secret)
+	// convert container logs to access logs
+	go containerDrainSub(ctx, db, logger)
+
 	defer ctx.Done()
 
 	apiConfig := &shared.ApiConfig{
pgs/tunnel.go pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)
pgs/tunnel.go pgs/tunnel.go
 			"pubkey", pubkeyStr,
 		)
 
-		props, err := getProjectFromSubdomain(subdomain)
+		props, err := shared.GetProjectFromSubdomain(subdomain)
 		if err != nil {
 			log.Error(err.Error())
 			return http.HandlerFunc(shared.UnauthorizedHandler)
 			logger,
 			apiConfig.Dbpool,
 			apiConfig.Storage,
-			apiConfig.AnalyticsQueue,
 		)
 		tunnelRouter := TunnelWebRouter{routes}
 		router := http.NewServeMux()
pgs/web.go pgs/web.go
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}
pgs/web.go pgs/web.go
 		return
 	}
 
-	ch := make(chan *db.AnalyticsVisits, 100)
-	go shared.AnalyticsCollect(ch, dbpool, logger)
-
-	routes := NewWebRouter(cfg, logger, dbpool, st, ch)
+	routes := NewWebRouter(cfg, logger, dbpool, st)
 
 	portStr := fmt.Sprintf(":%s", cfg.Port)
 	logger.Info(
 type HasPerm = func(proj *db.Project) bool
 
 type WebRouter struct {
-	Cfg            *shared.ConfigSite
-	Logger         *slog.Logger
-	Dbpool         db.DB
-	Storage        storage.StorageServe
-	AnalyticsQueue chan *db.AnalyticsVisits
-	RootRouter     *http.ServeMux
-	UserRouter     *http.ServeMux
+	Cfg        *shared.ConfigSite
+	Logger     *slog.Logger
+	Dbpool     db.DB
+	Storage    storage.StorageServe
+	RootRouter *http.ServeMux
+	UserRouter *http.ServeMux
 }
 
-func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe, analytics chan *db.AnalyticsVisits) *WebRouter {
+func NewWebRouter(cfg *shared.ConfigSite, logger *slog.Logger, dbpool db.DB, st storage.StorageServe) *WebRouter {
 	router := &WebRouter{
-		Cfg:            cfg,
-		Logger:         logger,
-		Dbpool:         dbpool,
-		Storage:        st,
-		AnalyticsQueue: analytics,
+		Cfg:     cfg,
+		Logger:  logger,
+		Dbpool:  dbpool,
+		Storage: st,
 	}
 	router.initRouters()
 	return router
 
 		if !strings.Contains(hostDomain, appDomain) {
 			subdomain := shared.GetCustomDomain(hostDomain, cfg.Space)
-			props, err := getProjectFromSubdomain(subdomain)
+			props, err := shared.GetProjectFromSubdomain(subdomain)
 			if err != nil {
 				logger.Error(
 					"could not get project from subdomain",
 		"host", r.Host,
 	)
 
-	props, err := getProjectFromSubdomain(subdomain)
+	props, err := shared.GetProjectFromSubdomain(subdomain)
 	if err != nil {
 		logger.Info(
 			"could not determine project from subdomain",
 	ctx = context.WithValue(ctx, shared.CtxSubdomainKey{}, subdomain)
 	router.ServeHTTP(w, r.WithContext(ctx))
 }
-
-type SubdomainProps struct {
-	ProjectName string
-	Username    string
-}
-
-func getProjectFromSubdomain(subdomain string) (*SubdomainProps, error) {
-	props := &SubdomainProps{}
-	strs := strings.SplitN(subdomain, "-", 2)
-	props.Username = strs[0]
-	if len(strs) == 2 {
-		props.ProjectName = strs[1]
-	} else {
-		props.ProjectName = props.Username
-	}
-	return props, nil
-}