mirror of
https://github.com/TronoSfera/backupy-agent.git
synced 2026-05-18 18:13:30 +03:00
Source ports from the TronoSfera/backupy-cloud monorepo:
- apps/agent/ — Go agent (WSS client, persistent queue, Docker
discovery, 5 DB drivers: PG/MySQL/Mongo/Redis/SQLite,
pre/post hooks, Prometheus metrics)
- apps/backupy-decrypt/ — standalone CLI for client-side decryption
- packages/proto/ — protobuf wire format (generated .pb.go committed
so the repo builds without protoc)
- docs/ — agent spec + wire-protocol contract
Apache-2.0 license. Image published to ghcr.io/tronosfera/backupy-agent
on every v* tag via .github/workflows/release.yml (multi-arch amd64+arm64).
219 lines
7.7 KiB
Go
219 lines
7.7 KiB
Go
// B14: MongoDB driver.
|
|
//
|
|
// Spawns `mongodump --archive --gzip` and streams the resulting archive
|
|
// to the pipeline writer. The archive is a single binary blob containing
|
|
// every database (or one when target.Connection.Database is set), so
|
|
// downstream stages (zstd compress, AES-GCM encrypt, upload) can treat
|
|
// it the same way they treat pg_dump's custom-format archive.
|
|
//
|
|
// We deliberately shell out instead of importing the official Mongo Go
|
|
// driver: the binary handles oplog tailing, BSON encoding and resume
|
|
// semantics already, and keeping the agent's go.mod minimal matters.
|
|
package pipeline
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os/exec"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
|
|
backupv1 "github.com/backupy/backupy/packages/proto/gen/go/backupv1"
|
|
)
|
|
|
|
// mongoDump is the MongoDB driver implementation.
|
|
type mongoDump struct {
|
|
binary string
|
|
runner cmdRunner
|
|
}
|
|
|
|
// NewMongoDump constructs the default driver wired to the bundled
|
|
// mongodump binary on $PATH.
|
|
func NewMongoDump() Driver {
|
|
return &mongoDump{binary: "mongodump", runner: realRunner{}}
|
|
}
|
|
|
|
// Name implements Driver.Name.
|
|
func (m *mongoDump) Name() string { return "mongodump" }
|
|
|
|
// Validate verifies the mongodump binary is installed and that the
|
|
// configured connection answers a quick ping. We use mongodump's own
|
|
// `--version` + a no-op archive write to /dev/null with `--dbpath` skipped
|
|
// — instead the cheapest reachability test is just `--version`. A full
|
|
// connection probe is left to the Dump call to avoid double round-trips
|
|
// on small databases.
|
|
func (m *mongoDump) Validate(ctx context.Context, target *backupv1.Target) error {
|
|
if target == nil || target.Connection == nil {
|
|
return errors.New("pipeline: mongodump: nil target/connection")
|
|
}
|
|
out, err := m.runner.Output(ctx, m.binary, []string{"--version"}, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("pipeline: mongodump version probe failed (is mongodump installed?): %w", err)
|
|
}
|
|
if !strings.Contains(strings.ToLower(string(out)), "mongodump") {
|
|
return fmt.Errorf("pipeline: unexpected mongodump --version output: %q", string(out))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Dump streams a `mongodump --archive --gzip` binary archive to out.
|
|
// We do NOT layer mongodump's own gzip on top of zstd — passing --gzip
|
|
// here keeps the archive self-describing and lets restore reach for the
|
|
// canonical `mongorestore --gzip --archive=…` workflow. The outer zstd
|
|
// stage will still compress the gzip stream (gain is marginal but the
|
|
// uniform pipeline shape outweighs the cost).
|
|
func (m *mongoDump) Dump(ctx context.Context, target *backupv1.Target, out io.Writer) (DumpInfo, error) {
|
|
if target == nil || target.Connection == nil {
|
|
return DumpInfo{}, errors.New("pipeline: mongodump: nil target/connection")
|
|
}
|
|
args := append(m.connArgs(target), "--archive", "--gzip")
|
|
if err := m.runner.RunStream(ctx, m.binary, args, nil, out); err != nil {
|
|
return DumpInfo{}, fmt.Errorf("pipeline: mongodump exec: %w", err)
|
|
}
|
|
versionOut, vErr := m.runner.Output(ctx, m.binary, []string{"--version"}, nil)
|
|
engineVersion := "MongoDB"
|
|
if vErr == nil {
|
|
engineVersion = parseMongodumpVersion(string(versionOut))
|
|
}
|
|
return DumpInfo{EngineVersion: engineVersion}, nil
|
|
}
|
|
|
|
// connArgs builds the host/port/user/uri flag tuple. If a URI is
|
|
// embedded in the host field (mongodb://…) we pass it via --uri,
|
|
// otherwise we use --host/--port/--username and rely on
|
|
// $MONGODB_PASSWORD via env (set by the runner via password_secret_ref).
|
|
func (m *mongoDump) connArgs(t *backupv1.Target) []string {
|
|
c := t.Connection
|
|
args := []string{}
|
|
|
|
if strings.HasPrefix(c.Host, "mongodb://") || strings.HasPrefix(c.Host, "mongodb+srv://") {
|
|
args = append(args, "--uri", c.Host)
|
|
} else {
|
|
if c.Host != "" {
|
|
args = append(args, "--host", c.Host)
|
|
}
|
|
if c.Port != 0 {
|
|
args = append(args, "--port", strconv.FormatUint(uint64(c.Port), 10))
|
|
}
|
|
if c.Username != "" {
|
|
args = append(args, "--username", c.Username)
|
|
}
|
|
if c.PasswordSecretRef != "" {
|
|
// mongodump accepts --password inline. We pass it as an
|
|
// argument because mongodump does not honour environment
|
|
// variables for the password.
|
|
args = append(args, "--password", c.PasswordSecretRef)
|
|
args = append(args, "--authenticationDatabase", "admin")
|
|
}
|
|
}
|
|
|
|
if c.Database != "" {
|
|
args = append(args, "--db", c.Database)
|
|
}
|
|
return args
|
|
}
|
|
|
|
// parseMongodumpVersion extracts the version line from mongodump
|
|
// --version output:
|
|
//
|
|
// "mongodump version: 100.9.5"
|
|
// -> "MongoDB Tools 100.9.5"
|
|
func parseMongodumpVersion(s string) string {
|
|
s = strings.TrimSpace(s)
|
|
for _, line := range strings.Split(s, "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if strings.HasPrefix(strings.ToLower(line), "mongodump version:") {
|
|
parts := strings.SplitN(line, ":", 2)
|
|
if len(parts) == 2 {
|
|
return "MongoDB Tools " + strings.TrimSpace(parts[1])
|
|
}
|
|
}
|
|
}
|
|
return "MongoDB"
|
|
}
|
|
|
|
// IsMongodumpArchiveMagic reports whether head starts with mongodump's
|
|
// archive magic number. The mongo archive format begins with the
|
|
// little-endian uint32 0x8199e26d. With --gzip the outer stream is gzip
|
|
// (magic 0x1f 0x8b) so we accept either.
|
|
func IsMongodumpArchiveMagic(head []byte) bool {
|
|
if len(head) >= 2 && head[0] == 0x1f && head[1] == 0x8b {
|
|
return true // gzip
|
|
}
|
|
if len(head) >= 4 && head[0] == 0x6d && head[1] == 0xe2 && head[2] == 0x99 && head[3] == 0x81 {
|
|
return true // raw mongo archive (little-endian 0x8199e26d)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// streaming runner with stderr ring buffer + ctx-cancel friendliness.
|
|
// Used by the mongo, redis and sqlite drivers — kept here so we do not
|
|
// disturb the existing realRunner that pg_dump / mysqldump rely on.
|
|
// -----------------------------------------------------------------------------
|
|
|
|
// ringBuffer is a fixed-capacity tail buffer for stderr. It always
|
|
// retains the LAST `cap` bytes written to it, dropping earlier writes.
|
|
// Concurrent Write calls are guarded by mu.
|
|
type ringBuffer struct {
|
|
mu sync.Mutex
|
|
buf []byte
|
|
cap int
|
|
}
|
|
|
|
func newRingBuffer(cap int) *ringBuffer { return &ringBuffer{cap: cap} }
|
|
|
|
func (r *ringBuffer) Write(p []byte) (int, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.buf = append(r.buf, p...)
|
|
if len(r.buf) > r.cap {
|
|
r.buf = r.buf[len(r.buf)-r.cap:]
|
|
}
|
|
return len(p), nil
|
|
}
|
|
|
|
func (r *ringBuffer) Tail() string {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
return strings.TrimSpace(string(r.buf))
|
|
}
|
|
|
|
// streamWithCancel runs a child process, copying its stdout to out and
|
|
// capturing the last 4 KB of stderr. On ctx.Done() it sends SIGINT to
|
|
// the child (rather than the default SIGKILL) so mongodump / redis-cli
|
|
// / sqlite3 get a chance to flush partial output and exit cleanly.
|
|
func streamWithCancel(ctx context.Context, name string, args []string, env []string, out io.Writer) error {
|
|
cmd := exec.CommandContext(ctx, name, args...)
|
|
cmd.Env = mergeEnv(env)
|
|
cmd.Stdout = out
|
|
stderrRing := newRingBuffer(4 * 1024)
|
|
cmd.Stderr = stderrRing
|
|
cmd.Cancel = func() error { return cmd.Process.Signal(syscall.SIGINT) }
|
|
|
|
if err := cmd.Run(); err != nil {
|
|
var ee *exec.ExitError
|
|
tail := stderrRing.Tail()
|
|
if errors.As(err, &ee) {
|
|
return fmt.Errorf("%s exited %d: %w (stderr: %s)", name, ee.ExitCode(), err, tail)
|
|
}
|
|
return fmt.Errorf("%s: %w (stderr: %s)", name, err, tail)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// streamingRunner is a cmdRunner that uses streamWithCancel for
|
|
// RunStream. Its Output method behaves like realRunner.Output.
|
|
type streamingRunner struct{}
|
|
|
|
func (streamingRunner) Output(ctx context.Context, name string, args []string, env []string) ([]byte, error) {
|
|
return realRunner{}.Output(ctx, name, args, env)
|
|
}
|
|
|
|
func (streamingRunner) RunStream(ctx context.Context, name string, args []string, env []string, out io.Writer) error {
|
|
return streamWithCancel(ctx, name, args, env, out)
|
|
}
|