auth/api.go
auth/api.go
Host string `json:"host"`
Uri string `json:"uri"`
Headers struct {
- UserAgent string `json:"User-Agent"`
- Referer string `json:"Referer"`
+ UserAgent []string `json:"User-Agent"`
+ Referer string `json:"Referer"`
} `json:"headers"`
Tls struct {
ServerName string `json:"server_name"`
Host: host,
Path: path,
IpAddress: access.Request.ClientIP,
- UserAgent: access.Request.Headers.UserAgent,
+ UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log
Status: access.Status,
}, nil
}
-func containerDrainSub(ctx context.Context, logger *slog.Logger) {
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain. The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to have pipe as a debugging tool which means we
+// can sub to `metric-drain` and have a nice clean output to look use.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+ info := shared.NewPicoPipeClient()
drain := pipe.NewReconnectReadWriteCloser(
ctx,
logger,
- shared.NewPicoPipeClient(),
- "container logs",
+ info,
+ "container drain",
"sub container-drain -k",
100,
-1,
)
- fmt.Println("WTFFFFFF")
- scanner := bufio.NewScanner(drain)
- for scanner.Scan() {
- line := scanner.Text()
- fmt.Println("HMMMM", line)
- if strings.Contains(line, "http.log.access") {
- clean := strings.TrimSpace(line)
- fmt.Println("LINE", clean)
- // TODO: send to metric drain
+ send := pipe.NewReconnectReadWriteCloser(
+ ctx,
+ logger,
+ info,
+ "from container drain to metric drain",
+ "pub metric-drain -b=false",
+ 100,
+ -1,
+ )
+
+ for {
+ scanner := bufio.NewScanner(drain)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if strings.Contains(line, "http.log.access") {
+ clean := strings.TrimSpace(line)
+ visit, err := accessLogToVisit(dbpool, clean)
+ if err != nil {
+ logger.Error("could not convert access log to a visit", "err", err)
+ continue
+ }
+ jso, err := json.Marshal(visit)
+ if err != nil {
+ logger.Error("could not marshal json of a visit", "err", err)
+ continue
+ }
+ _, _ = send.Write(jso)
+ }
}
}
}
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+ accessLog := CaddyAccessLog{}
+ err := json.Unmarshal([]byte(line), &accessLog)
+ if err != nil {
+ return nil, err
+ }
+
+ return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
drain := metrics.ReconnectReadMetrics(
ctx,
-1,
)
- scanner := bufio.NewScanner(drain)
- for scanner.Scan() {
- line := scanner.Text()
- accessLog := CaddyAccessLog{}
- err := json.Unmarshal([]byte(line), &accessLog)
- if err != nil {
- logger.Error("json unmarshal", "err", err)
- continue
- }
-
- visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
- if err != nil {
- logger.Error("cannot deserialize access log", "err", err)
- continue
- }
- err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
- if err != nil {
- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
- logger.Info("could not record analytics visit", "reason", err)
+ for {
+ scanner := bufio.NewScanner(drain)
+ for scanner.Scan() {
+ line := scanner.Text()
+ visit, err := accessLogToVisit(dbpool, line)
+ if err != nil {
+ logger.Error("could not convert access log to a visit", "err", err)
+ continue
+ }
+ err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+ if err != nil {
+ if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+ logger.Info("could not record analytics visit", "reason", err)
+ }
}
- }
- logger.Info("inserting visit", "visit", visit)
- err = dbpool.InsertVisit(visit)
- if err != nil {
- logger.Error("could not insert visit record", "err", err)
+ logger.Info("inserting visit", "visit", visit)
+ err = dbpool.InsertVisit(visit)
+ if err != nil {
+ logger.Error("could not insert visit record", "err", err)
+ }
}
}
- fmt.Println("DROPINNGGGGGGG")
}
func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
// gather metrics in the auth service
go metricDrainSub(ctx, db, logger, cfg.Secret)
// convert container logs to access logs
- // go containerDrainSub(ctx, logger)
+ go containerDrainSub(ctx, db, logger)
defer ctx.Done()
auth/api.go
auth/api.go
"log/slog"
"net/http"
"net/url"
+ "strings"
"time"
"github.com/gorilla/feeds"
"github.com/picosh/pico/db/postgres"
"github.com/picosh/pico/shared"
"github.com/picosh/utils"
+ "github.com/picosh/utils/pipe"
"github.com/picosh/utils/pipe/metrics"
)
}
}
+type AccessLogReq struct {
+ RemoteIP string `json:"remote_ip"`
+ RemotePort string `json:"remote_port"`
+ ClientIP string `json:"client_ip"`
+ Method string `json:"method"`
+ Host string `json:"host"`
+ Uri string `json:"uri"`
+ Headers struct {
+ UserAgent []string `json:"User-Agent"`
+ Referer []string `json:"Referer"`
+ } `json:"headers"`
+ Tls struct {
+ ServerName string `json:"server_name"`
+ } `json:"tls"`
+}
+
+type RespHeaders struct {
+ ContentType []string `json:"Content-Type"`
+}
+
+type CaddyAccessLog struct {
+ Request AccessLogReq `json:"request"`
+ Status int `json:"status"`
+ RespHeaders RespHeaders `json:"resp_headers"`
+}
+
+func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
+ spaceRaw := strings.SplitN(access.Request.Tls.ServerName, ".", 2)
+ space := spaceRaw[0]
+ host := access.Request.Host
+ path := access.Request.Uri
+ subdomain := ""
+
+ // grab subdomain based on host
+ if strings.HasSuffix(host, "tuns.sh") {
+ subdomain = strings.TrimSuffix(host, ".tuns.sh")
+ } else if strings.HasSuffix(host, "pgs.sh") {
+ subdomain = strings.TrimSuffix(host, ".pgs.sh")
+ } else if strings.HasSuffix(host, "prose.sh") {
+ subdomain = strings.TrimSuffix(host, ".prose.sh")
+ } else {
+ subdomain = shared.GetCustomDomain(host, space)
+ }
+
+ // get user and namespace details from subdomain
+ props, err := shared.GetProjectFromSubdomain(subdomain)
+ if err != nil {
+ return nil, err
+ }
+ // get user ID
+ user, err := dbpool.FindUserForName(props.Username)
+ if err != nil {
+ return nil, err
+ }
+
+ projectID := ""
+ postID := ""
+ if space == "pgs" { // figure out project ID
+ project, err := dbpool.FindProjectByName(user.ID, props.ProjectName)
+ if err != nil {
+ return nil, err
+ }
+ projectID = project.ID
+ } else if space == "prose" { // figure out post ID
+ if path == "" || path == "/" {
+ } else {
+ post, err := dbpool.FindPostWithSlug(path, user.ID, space)
+ if err != nil {
+ return nil, err
+ }
+ postID = post.ID
+ }
+ }
+
+ return &db.AnalyticsVisits{
+ UserID: user.ID,
+ ProjectID: projectID,
+ PostID: postID,
+ Namespace: space,
+ Host: host,
+ Path: path,
+ IpAddress: access.Request.ClientIP,
+ UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
+ Referer: strings.Join(access.Request.Headers.Referer, " "),
+ ContentType: strings.Join(access.RespHeaders.ContentType, " "),
+ Status: access.Status,
+ }, nil
+}
+
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain. The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to use pipe as a debugging tool which means we
+// can manually sub to `metric-drain` and have a nice clean output to view.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+ info := shared.NewPicoPipeClient()
+ drain := pipe.NewReconnectReadWriteCloser(
+ ctx,
+ logger,
+ info,
+ "container drain",
+ "sub container-drain -k",
+ 100,
+ -1,
+ )
+
+ send := pipe.NewReconnectReadWriteCloser(
+ ctx,
+ logger,
+ info,
+ "from container drain to metric drain",
+ "pub metric-drain -b=false",
+ 100,
+ -1,
+ )
+
+ for {
+ scanner := bufio.NewScanner(drain)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if strings.Contains(line, "http.log.access") {
+ clean := strings.TrimSpace(line)
+ visit, err := accessLogToVisit(dbpool, clean)
+ if err != nil {
+ logger.Debug("could not convert access log to a visit", "err", err)
+ continue
+ }
+ jso, err := json.Marshal(visit)
+ if err != nil {
+ logger.Error("could not marshal json of a visit", "err", err)
+ continue
+ }
+ _, _ = send.Write(jso)
+ }
+ }
+ }
+}
+
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+ accessLog := CaddyAccessLog{}
+ err := json.Unmarshal([]byte(line), &accessLog)
+ if err != nil {
+ return nil, err
+ }
+
+ return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
drain := metrics.ReconnectReadMetrics(
ctx,
visit := db.AnalyticsVisits{}
err := json.Unmarshal([]byte(line), &visit)
if err != nil {
- logger.Error("json unmarshal", "err", err)
+ logger.Info("could not unmarshal json", "err", err, "line", line)
continue
}
-
- user := slog.Any("userId", visit.UserID)
-
err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
if err != nil {
if !errors.Is(err, shared.ErrAnalyticsDisabled) {
- logger.Info("could not record analytics visit", "reason", err, "visit", visit, user)
- continue
+ logger.Info("could not record analytics visit", "reason", err)
}
}
- logger.Info("inserting visit", "visit", visit, user)
+ if visit.ContentType != "" && !strings.HasPrefix(visit.ContentType, "text/html") {
+ continue
+ }
+
+ logger.Info("inserting visit", "visit", visit)
err = dbpool.InsertVisit(&visit)
if err != nil {
- logger.Error("could not insert visit record", "err", err, "visit", visit, user)
+ logger.Error("could not insert visit record", "err", err)
}
}
-
- if scanner.Err() != nil {
- logger.Error("scanner error", "err", scanner.Err())
- }
}
}
// gather metrics in the auth service
go metricDrainSub(ctx, db, logger, cfg.Secret)
+ // convert container logs to access logs
+ go containerDrainSub(ctx, db, logger)
+
defer ctx.Done()
apiConfig := &shared.ApiConfig{