diff --git a/apps/agent/internal/pipeline/runner.go b/apps/agent/internal/pipeline/runner.go index da5c38f..86c1615 100644 --- a/apps/agent/internal/pipeline/runner.go +++ b/apps/agent/internal/pipeline/runner.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "log/slog" + "os" "strings" "time" @@ -302,14 +303,30 @@ func (r *Runner) Run(ctx context.Context, req *backupv1.RunBackup) (completed *b errs <- nil }() - // Stage 4 — upload (blocking call on the calling goroutine). On - // failure we still need to wait for the three upstream goroutines - // to unwind so the function does not leak them; closing the pipe - // readers below makes their pending Write calls return promptly. - sha256hex, uploaded, uploadErr := r.uploader.Put(ctx, req.UploadCreds.PresignedPutUrl, encryptedPR, -1) + // Stage 4 — drain the encrypted pipe into a temp file, then PUT it + // with a known Content-Length. MinIO (and stricter S3 endpoints) + // reject chunked PUTs against presigned URLs with HTTP 411. + // Buffering on disk keeps memory flat while still allowing the + // dump → compress → encrypt goroutines to overlap with the drain. + stagedSize, stagedPath, stageErr := stageEncryptedBody(encryptedPR) + if stagedPath != "" { + defer func() { _ = os.Remove(stagedPath) }() + } + var sha256hex string + var uploaded int64 + var uploadErr error + if stageErr != nil { + uploadErr = stageErr + } else { + stagedFile, openErr := os.Open(stagedPath) + if openErr != nil { + uploadErr = fmt.Errorf("open staged body: %w", openErr) + } else { + sha256hex, uploaded, uploadErr = r.uploader.Put(ctx, req.UploadCreds.PresignedPutUrl, stagedFile, stagedSize) + _ = stagedFile.Close() + } + } if uploadErr != nil { - // Closing the readers signals every upstream Write to fail - // with io.ErrClosedPipe so the producer goroutines exit. _ = encryptedPR.CloseWithError(uploadErr) _ = compressedPR.CloseWithError(uploadErr) _ = dumpPR.CloseWithError(uploadErr) @@ -469,6 +486,29 @@ func (passthroughDEK) Unwrap(_ context.Context, in []byte) ([]byte, error) { return out, nil } +// stageEncryptedBody drains src into a fresh temp file and returns the +// path + total size so the caller can issue a PUT with an explicit +// Content-Length. MinIO and stricter S3 endpoints reject chunked +// transfer-encoding against presigned URLs (HTTP 411). On error, the +// caller is responsible for removing the (possibly partial) file at +// the returned path. +func stageEncryptedBody(src io.Reader) (int64, string, error) { + f, err := os.CreateTemp(os.TempDir(), "backupy-upload-*.bin") + if err != nil { + return 0, "", fmt.Errorf("stage upload: create temp: %w", err) + } + path := f.Name() + n, copyErr := io.Copy(f, src) + closeErr := f.Close() + if copyErr != nil { + return n, path, fmt.Errorf("stage upload: copy: %w", copyErr) + } + if closeErr != nil { + return n, path, fmt.Errorf("stage upload: close: %w", closeErr) + } + return n, path, nil +} + // wipe zeroes a byte slice. Best-effort — the Go runtime makes no // guarantee that the underlying memory pages aren't already swapped // out, but this still raises the bar for casual memory inspection.