1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
diff --git a/auth/api.go b/auth/api.go
index 6d997ab..c25e2e7 100644
--- a/auth/api.go
+++ b/auth/api.go
@@ -588,8 +588,8 @@ type AccessLogReq struct {
Host string `json:"host"`
Uri string `json:"uri"`
Headers struct {
- UserAgent string `json:"User-Agent"`
- Referer string `json:"Referer"`
+ UserAgent []string `json:"User-Agent"`
+ Referer string `json:"Referer"`
} `json:"headers"`
Tls struct {
ServerName string `json:"server_name"`
@@ -657,36 +657,72 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
Host: host,
Path: path,
IpAddress: access.Request.ClientIP,
- UserAgent: access.Request.Headers.UserAgent,
+ UserAgent: strings.Join(access.Request.Headers.UserAgent, " "),
Referer: access.Request.Headers.Referer, // TODO: I don't see referer in the access log
Status: access.Status,
}, nil
}
-func containerDrainSub(ctx context.Context, logger *slog.Logger) {
+// this feels really stupid because i'm taking containter-drain,
+// filtering it, and then sending it to metric-drain. The
+// metricDrainSub function listens on the metric-drain and saves it.
+// So why not just call the necessary functions to save the visit?
+// We want to be able to have pipe as a debugging tool which means we
+// can sub to `metric-drain` and have a nice clean output to look use.
+func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
+ info := shared.NewPicoPipeClient()
drain := pipe.NewReconnectReadWriteCloser(
ctx,
logger,
- shared.NewPicoPipeClient(),
- "container logs",
+ info,
+ "container drain",
"sub container-drain -k",
100,
-1,
)
- fmt.Println("WTFFFFFF")
- scanner := bufio.NewScanner(drain)
- for scanner.Scan() {
- line := scanner.Text()
- fmt.Println("HMMMM", line)
- if strings.Contains(line, "http.log.access") {
- clean := strings.TrimSpace(line)
- fmt.Println("LINE", clean)
- // TODO: send to metric drain
+ send := pipe.NewReconnectReadWriteCloser(
+ ctx,
+ logger,
+ info,
+ "from container drain to metric drain",
+ "pub metric-drain -b=false",
+ 100,
+ -1,
+ )
+
+ for {
+ scanner := bufio.NewScanner(drain)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if strings.Contains(line, "http.log.access") {
+ clean := strings.TrimSpace(line)
+ visit, err := accessLogToVisit(dbpool, clean)
+ if err != nil {
+ logger.Error("could not convert access log to a visit", "err", err)
+ continue
+ }
+ jso, err := json.Marshal(visit)
+ if err != nil {
+ logger.Error("could not marshal json of a visit", "err", err)
+ continue
+ }
+ _, _ = send.Write(jso)
+ }
}
}
}
+func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
+ accessLog := CaddyAccessLog{}
+ err := json.Unmarshal([]byte(line), &accessLog)
+ if err != nil {
+ return nil, err
+ }
+
+ return deserializeCaddyAccessLog(dbpool, &accessLog)
+}
+
func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
drain := metrics.ReconnectReadMetrics(
ctx,
@@ -696,35 +732,29 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
-1,
)
- scanner := bufio.NewScanner(drain)
- for scanner.Scan() {
- line := scanner.Text()
- accessLog := CaddyAccessLog{}
- err := json.Unmarshal([]byte(line), &accessLog)
- if err != nil {
- logger.Error("json unmarshal", "err", err)
- continue
- }
-
- visit, err := deserializeCaddyAccessLog(dbpool, &accessLog)
- if err != nil {
- logger.Error("cannot deserialize access log", "err", err)
- continue
- }
- err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
- if err != nil {
- if !errors.Is(err, shared.ErrAnalyticsDisabled) {
- logger.Info("could not record analytics visit", "reason", err)
+ for {
+ scanner := bufio.NewScanner(drain)
+ for scanner.Scan() {
+ line := scanner.Text()
+ visit, err := accessLogToVisit(dbpool, line)
+ if err != nil {
+ logger.Error("could not convert access log to a visit", "err", err)
+ continue
+ }
+ err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
+ if err != nil {
+ if !errors.Is(err, shared.ErrAnalyticsDisabled) {
+ logger.Info("could not record analytics visit", "reason", err)
+ }
}
- }
- logger.Info("inserting visit", "visit", visit)
- err = dbpool.InsertVisit(visit)
- if err != nil {
- logger.Error("could not insert visit record", "err", err)
+ logger.Info("inserting visit", "visit", visit)
+ err = dbpool.InsertVisit(visit)
+ if err != nil {
+ logger.Error("could not insert visit record", "err", err)
+ }
}
}
- fmt.Println("DROPINNGGGGGGG")
}
func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
@@ -796,7 +826,7 @@ func StartApiServer() {
// gather metrics in the auth service
go metricDrainSub(ctx, db, logger, cfg.Secret)
// convert container logs to access logs
- // go containerDrainSub(ctx, logger)
+ go containerDrainSub(ctx, db, logger)
defer ctx.Done()
|