dashboard / erock/pico / fix(pgs): memory leaks #92 rss

closed · opened on 2025-12-16T04:16:16Z by erock
Help
checkout latest patchset:
ssh pr.pico.sh print pr-92 | git am -3
checkout any patchset in a patch request:
ssh pr.pico.sh print ps-X | git am -3
add changes to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add 92
add review to patch request:
git format-patch main --stdout | ssh pr.pico.sh pr add --review 92
accept PR:
ssh pr.pico.sh pr accept 92
close PR:
ssh pr.pico.sh pr close 92
Timeline Patchsets

Patchset ps-169

fix(pgs): memory leaks

Eric Bower
2025-12-16T04:01:27Z

fix: tunnel leaks

Eric Bower
2025-12-16T04:10:10Z
.gitignore
+2 -0

fix: tunkit memory leaks

Eric Bower
2025-12-16T04:13:11Z
Back to top
All changes replace defer with immediate Close() calls to ensure resources are released promptly rather than accumulating until function return.
+5 -9 pkg/apps/pgs/web_asset_handler.go link
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
diff --git a/pkg/apps/pgs/web_asset_handler.go b/pkg/apps/pgs/web_asset_handler.go
index aba0ce2..abee4bf 100644
--- a/pkg/apps/pgs/web_asset_handler.go
+++ b/pkg/apps/pgs/web_asset_handler.go
@@ -51,10 +51,8 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		logger.Info("_redirects not found in lru cache", "key", redirectsCacheKey)
 		redirectFp, redirectInfo, err := h.Cfg.Storage.GetObject(h.Bucket, filepath.Join(h.ProjectDir, "_redirects"))
 		if err == nil {
-			defer func() {
-				_ = redirectFp.Close()
-			}()
 			if redirectInfo != nil && redirectInfo.Size > h.Cfg.MaxSpecialFileSize {
+				_ = redirectFp.Close()
 				errMsg := fmt.Sprintf("_redirects file is too large (%d > %d)", redirectInfo.Size, h.Cfg.MaxSpecialFileSize)
 				logger.Error(errMsg)
 				http.Error(w, errMsg, http.StatusInternalServerError)
@@ -63,6 +61,7 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			buf := new(strings.Builder)
 			lr := io.LimitReader(redirectFp, h.Cfg.MaxSpecialFileSize)
 			_, err := io.Copy(buf, lr)
+			_ = redirectFp.Close()
 			if err != nil {
 				logger.Error("io copy", "err", err.Error())
 				http.Error(w, "cannot read _redirects file", http.StatusInternalServerError)
@@ -109,9 +108,7 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 				if err != nil {
 					continue
 				}
-				defer func() {
-					_ = obj.Close()
-				}()
+				_ = obj.Close()
 			}
 			logger.Info(
 				"redirecting request",
@@ -219,10 +216,8 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		logger.Info("_headers not found in lru cache", "key", headersCacheKey)
 		headersFp, headersInfo, err := h.Cfg.Storage.GetObject(h.Bucket, filepath.Join(h.ProjectDir, "_headers"))
 		if err == nil {
-			defer func() {
-				_ = headersFp.Close()
-			}()
 			if headersInfo != nil && headersInfo.Size > h.Cfg.MaxSpecialFileSize {
+				_ = headersFp.Close()
 				errMsg := fmt.Sprintf("_headers file is too large (%d > %d)", headersInfo.Size, h.Cfg.MaxSpecialFileSize)
 				logger.Error(errMsg)
 				http.Error(w, errMsg, http.StatusInternalServerError)
@@ -231,6 +226,7 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			buf := new(strings.Builder)
 			lr := io.LimitReader(headersFp, h.Cfg.MaxSpecialFileSize)
 			_, err := io.Copy(buf, lr)
+			_ = headersFp.Close()
 			if err != nil {
 				logger.Error("io copy", "err", err.Error())
 				http.Error(w, "cannot read _headers file", http.StatusInternalServerError)

+6 -2 pkg/apps/pgs/gen_dir_listing.go link
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
diff --git a/pkg/apps/pgs/gen_dir_listing.go b/pkg/apps/pgs/gen_dir_listing.go
index fc1889e..a5e612a 100644
--- a/pkg/apps/pgs/gen_dir_listing.go
+++ b/pkg/apps/pgs/gen_dir_listing.go
@@ -100,8 +100,12 @@ func shouldGenerateListing(st sst.ObjectStorage, bucket sst.Bucket, projectDir s
 	}
 
 	indexPath := dirPath + "index.html"
-	_, _, err = st.GetObject(bucket, indexPath)
-	return err != nil
+	obj, _, err := st.GetObject(bucket, indexPath)
+	if err != nil {
+		return true
+	}
+	_ = obj.Close()
+	return false
 }
 
 func generateDirectoryHTML(path string, entries []os.FileInfo) string {

+2 -0 .gitignore link
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
diff --git a/.gitignore b/.gitignore
index 5c8df21..eb84888 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,5 @@ __debug_bin
 .bin
 /public/
 .aider*
+ssh
+web
+1 -1 pkg/apps/pgs/tunnel.go link
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
diff --git a/pkg/apps/pgs/tunnel.go b/pkg/apps/pgs/tunnel.go
index a7b5ba4..97e6ce1 100644
--- a/pkg/apps/pgs/tunnel.go
+++ b/pkg/apps/pgs/tunnel.go
@@ -126,7 +126,7 @@ func CreateHttpHandler(cfg *PgsConfig) CtxHttpBridge {
 
 		log.Info("user has access to site")
 
-		routes := NewWebRouter(cfg)
+		routes := newWebRouter(cfg)
 		tunnelRouter := TunnelWebRouter{routes, subdomain}
 		tunnelRouter.InitRouter()
 		return &tunnelRouter
+6 -1 pkg/apps/pgs/web.go link
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
diff --git a/pkg/apps/pgs/web.go b/pkg/apps/pgs/web.go
index 64dbb25..2ddc6db 100644
--- a/pkg/apps/pgs/web.go
+++ b/pkg/apps/pgs/web.go
@@ -107,13 +107,18 @@ type WebRouter struct {
 }
 
 func NewWebRouter(cfg *PgsConfig) *WebRouter {
+	router := newWebRouter(cfg)
+	go router.WatchCacheClear()
+	return router
+}
+
+func newWebRouter(cfg *PgsConfig) *WebRouter {
 	router := &WebRouter{
 		Cfg:            cfg,
 		RedirectsCache: expirable.NewLRU[string, []*RedirectRule](2048, nil, cache.CacheTimeout),
 		HeadersCache:   expirable.NewLRU[string, []*HeaderRule](2048, nil, cache.CacheTimeout),
 	}
 	router.initRouters()
-	go router.WatchCacheClear()
 	return router
 }
 

- Context leak: Added defer cancel() immediately after creating the context to ensure it's always cancelled when the function returns
- Goroutine coordination: Removed the outer goroutine wrapper - the handler now runs synchronously, waiting for io.Copy goroutines to complete via wg.Wait() before calling handler.Close()
- Simplified cleanup: Moved connection closing to after wg.Wait() to ensure both copy operations complete before closing resources
- Removed dead code: The <-ctx.Done() was waiting forever since nothing called cancel() - now the function naturally completes when the io.Copy operations finish (when either side closes the connection)
+30 -41 pkg/tunkit/ptun.go link
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
diff --git a/pkg/tunkit/ptun.go b/pkg/tunkit/ptun.go
index 9faa754..df256b9 100644
--- a/pkg/tunkit/ptun.go
+++ b/pkg/tunkit/ptun.go
@@ -53,6 +53,8 @@ func LocalForwardHandler(handler Tunnel) pssh.SSHServerChannelMiddleware {
 		}
 
 		origCtx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+
 		ctx := &pssh.SSHServerConnSession{
 			Channel:       ch,
 			SSHServerConn: sc,
@@ -62,51 +64,38 @@ func LocalForwardHandler(handler Tunnel) pssh.SSHServerChannelMiddleware {
 
 		go ssh.DiscardRequests(reqs)
 
+		downConn, err := handler.CreateConn(ctx)
+		if err != nil {
+			log.Error("unable to connect to conn", "err", err)
+			_ = ch.Close()
+			return err
+		}
+
+		var wg sync.WaitGroup
+		wg.Add(2)
+
 		go func() {
-			downConn, err := handler.CreateConn(ctx)
-			if err != nil {
-				log.Error("unable to connect to conn", "err", err)
-				_ = ch.Close()
-				return
+			defer wg.Done()
+			_, err := io.Copy(ch, downConn)
+			if err != nil && !errors.Is(err, net.ErrClosed) {
+				log.Error("io copy", "err", err)
 			}
-			defer func() {
-				_ = downConn.Close()
-			}()
-
-			var wg sync.WaitGroup
-			wg.Add(2)
-
-			go func() {
-				defer wg.Done()
-				defer func() {
-					_ = ch.CloseWrite()
-					_ = downConn.Close()
-				}()
-				_, err := io.Copy(ch, downConn)
-				if err != nil {
-					if !errors.Is(err, net.ErrClosed) {
-						log.Error("io copy", "err", err)
-					}
-				}
-			}()
-			go func() {
-				defer wg.Done()
-				defer func() {
-					_ = ch.Close()
-					_ = downConn.Close()
-				}()
-				_, err := io.Copy(downConn, ch)
-				if err != nil {
-					if !errors.Is(err, net.ErrClosed) {
-						log.Error("io copy", "err", err)
-					}
-				}
-			}()
-
-			wg.Wait()
+			_ = ch.CloseWrite()
 		}()
 
-		<-ctx.Done()
+		go func() {
+			defer wg.Done()
+			_, err := io.Copy(downConn, ch)
+			if err != nil && !errors.Is(err, net.ErrClosed) {
+				log.Error("io copy", "err", err)
+			}
+			_ = downConn.Close()
+		}()
+
+		wg.Wait()
+		_ = ch.Close()
+		_ = downConn.Close()
+
 		err = handler.Close(ctx)
 		if err != nil {
 			log.Error("tunnel handler error", "err", err)