diff --git a/.gitignore b/.gitignore index 33dc24a..cb43ff4 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ /bin/ /dist/ /build/ +apps/agent/bin/ # IDE .idea/ @@ -16,7 +17,6 @@ .env.local .env.*.local *.pem -*.key !*.pub.key secrets/ @@ -24,7 +24,9 @@ secrets/ *.log logs/ -# State (BoltDB queue / WAL) +# Runtime state (BoltDB queue files). Anchored to repo root so the +# pattern does NOT shadow the apps/agent/internal/state Go package. +/state/ +/var/ *.db *.bolt -state/ diff --git a/apps/agent/internal/state/crypto.go b/apps/agent/internal/state/crypto.go new file mode 100644 index 0000000..dec77db --- /dev/null +++ b/apps/agent/internal/state/crypto.go @@ -0,0 +1,96 @@ +// State-at-rest encryption helpers. +// +// All bucket *values* are wrapped with AES-256-GCM using a key derived from +// BACKUP_AGENT_KEY via HKDF-SHA256 (per docs/03-agent-spec.md → +// "Шифрование state опционально (key derived из BACKUP_AGENT_KEY)"). +// +// Wire format on disk: +// +// [1 byte version=0x01] [12 bytes nonce] [ciphertext] [16 bytes GCM tag] +// +// Bucket *keys* (e.g. job ids) are stored in clear because BoltDB relies on +// the byte-ordering of keys for iteration and a deterministic encryption of +// keys would still leak ordering information without buying much. +package state + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "errors" + "fmt" + "io" + + "golang.org/x/crypto/hkdf" +) + +const ( + stateKeyLen = 32 // AES-256 + stateNonceLen = 12 // GCM standard + stateEnvVersion = byte(0x01) // bump on format change + hkdfInfo = "backupy-agent/state-v1" + hkdfSalt = "backupy-agent-state-salt" +) + +// errCipher indicates corrupt or wrong-key state. Surfaced as-is so the +// agent can refuse to start instead of silently overwriting good data. +var errCipher = errors.New("state: cipher open failed (corrupt or wrong key)") + +// deriveStateKey returns a 32-byte AES-256 key derived from the agent key. +// HKDF is overkill for a single derivation step, but it costs nothing and +// keeps us compliant with NIST SP 800-108 should we ever need to rotate. +func deriveStateKey(agentKey string) ([]byte, error) { + if agentKey == "" { + return nil, errors.New("state: empty agent key — cannot derive encryption key") + } + r := hkdf.New(sha256New, []byte(agentKey), []byte(hkdfSalt), []byte(hkdfInfo)) + key := make([]byte, stateKeyLen) + if _, err := io.ReadFull(r, key); err != nil { + return nil, fmt.Errorf("state: hkdf read: %w", err) + } + return key, nil +} + +// newGCM constructs a fresh AEAD around a 32-byte AES key. +func newGCM(key []byte) (cipher.AEAD, error) { + block, err := aes.NewCipher(key) + if err != nil { + return nil, fmt.Errorf("state: aes init: %w", err) + } + aead, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("state: gcm init: %w", err) + } + return aead, nil +} + +// seal returns version || nonce || ciphertext+tag. +func seal(aead cipher.AEAD, plaintext []byte) ([]byte, error) { + nonce := make([]byte, stateNonceLen) + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return nil, fmt.Errorf("state: rand: %w", err) + } + out := make([]byte, 0, 1+stateNonceLen+len(plaintext)+aead.Overhead()) + out = append(out, stateEnvVersion) + out = append(out, nonce...) + out = aead.Seal(out, nonce, plaintext, nil) + return out, nil +} + +// open reverses seal. Returns errCipher on any decryption failure so callers +// can distinguish "no such record" (bbolt nil) from "tampered/wrong key". +func open(aead cipher.AEAD, sealed []byte) ([]byte, error) { + if len(sealed) < 1+stateNonceLen+aead.Overhead() { + return nil, errCipher + } + if sealed[0] != stateEnvVersion { + return nil, fmt.Errorf("state: unsupported envelope version 0x%02x", sealed[0]) + } + nonce := sealed[1 : 1+stateNonceLen] + ct := sealed[1+stateNonceLen:] + pt, err := aead.Open(nil, nonce, ct, nil) + if err != nil { + return nil, errCipher + } + return pt, nil +} diff --git a/apps/agent/internal/state/sha.go b/apps/agent/internal/state/sha.go new file mode 100644 index 0000000..28c60a8 --- /dev/null +++ b/apps/agent/internal/state/sha.go @@ -0,0 +1,10 @@ +package state + +import ( + "crypto/sha256" + "hash" +) + +// sha256New is a function value compatible with hkdf.New's first arg. +// Wrapping the constructor lets us swap in a test double if ever needed. +var sha256New = func() hash.Hash { return sha256.New() } diff --git a/apps/agent/internal/state/state.go b/apps/agent/internal/state/state.go new file mode 100644 index 0000000..9ef5ce2 --- /dev/null +++ b/apps/agent/internal/state/state.go @@ -0,0 +1,365 @@ +// Package state owns the agent's persistent on-disk state — a BoltDB file +// at $BACKUP_STATE_DIR/state.db. +// +// Buckets: +// +// "config" — last-known AgentConfig (key: "current") and version. +// "queue" — pending RunBackup jobs (key: run_id, value: encoded envelope). +// "registry" — session metadata: last session_id, server_time, heartbeat. +// "logs_buffer" — rate-limited LogEvent buffer when server is unreachable. +// +// All bucket values are encrypted with AES-256-GCM keyed by HKDF(BACKUP_AGENT_KEY). +// See crypto.go for the wire format. +// +// Concurrency: bbolt serialises write transactions itself, so the Store is +// safe for concurrent use without an internal mutex. +package state + +import ( + "crypto/cipher" + "encoding/binary" + "errors" + "fmt" + "path/filepath" + "time" + + bolt "go.etcd.io/bbolt" +) + +// Bucket names — exported only as constants here; callers go through +// Store methods, not raw bbolt buckets. +var ( + bktConfig = []byte("config") + bktQueue = []byte("queue") + bktRegistry = []byte("registry") + bktLogs = []byte("logs_buffer") + + keyConfigCurrent = []byte("current") + keyConfigVersion = []byte("version") + keySessionID = []byte("session_id") + keyServerTime = []byte("server_time_ms") + keyHeartbeat = []byte("last_heartbeat_ms") +) + +// ErrNotFound is returned when a key is absent. Distinguishing missing data +// from a cipher error is important — a wrong key must never be silently +// treated as "no config yet". +var ErrNotFound = errors.New("state: not found") + +// Store is the public handle for the agent's BoltDB-backed state. +type Store struct { + db *bolt.DB + aead cipher.AEAD +} + +// QueuedJob is a single pending job pulled from the queue bucket. +type QueuedJob struct { + RunID string + Payload []byte // decrypted, opaque to this package +} + +// Options controls Store construction. All fields are optional except +// AgentKey which is required to derive the AES key. +type Options struct { + AgentKey string + // Timeout controls how long Open waits for an exclusive file lock. + // Zero defaults to 5 seconds — enough for an old process to die, + // short enough to fail fast in CI. + Timeout time.Duration +} + +// Open creates or opens the BoltDB file at path, initialises the four core +// buckets, and prepares the AES cipher used for value encryption. +func Open(path string, opts Options) (*Store, error) { + if path == "" { + return nil, errors.New("state: empty path") + } + if filepath.Ext(path) == "" { + // Be forgiving: callers pass us a directory by mistake more often + // than they pass a file with no extension. Suffix .db here so the + // resulting error message is obvious. + path += ".db" + } + timeout := opts.Timeout + if timeout == 0 { + timeout = 5 * time.Second + } + key, err := deriveStateKey(opts.AgentKey) + if err != nil { + return nil, err + } + aead, err := newGCM(key) + if err != nil { + return nil, err + } + db, err := bolt.Open(path, 0o600, &bolt.Options{Timeout: timeout}) + if err != nil { + return nil, fmt.Errorf("state: open bbolt %q: %w", path, err) + } + s := &Store{db: db, aead: aead} + if err := s.ensureBuckets(); err != nil { + _ = db.Close() + return nil, err + } + return s, nil +} + +func (s *Store) ensureBuckets() error { + return s.db.Update(func(tx *bolt.Tx) error { + for _, b := range [][]byte{bktConfig, bktQueue, bktRegistry, bktLogs} { + if _, err := tx.CreateBucketIfNotExists(b); err != nil { + return fmt.Errorf("state: create bucket %s: %w", b, err) + } + } + return nil + }) +} + +// Close releases the BoltDB file handle. +func (s *Store) Close() error { + if s == nil || s.db == nil { + return nil + } + return s.db.Close() +} + +// Path returns the file path of the underlying BoltDB. +func (s *Store) Path() string { + return s.db.Path() +} + +// --- config bucket -------------------------------------------------------- + +// SaveConfig stores the encoded AgentConfig snapshot together with its +// monotonically increasing version. Callers serialise the protobuf +// themselves so this package stays oblivious to message shapes. +func (s *Store) SaveConfig(version uint64, raw []byte) error { + enc, err := seal(s.aead, raw) + if err != nil { + return err + } + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bktConfig) + if err := b.Put(keyConfigCurrent, enc); err != nil { + return err + } + return b.Put(keyConfigVersion, u64Bytes(version)) + }) +} + +// LoadConfig returns the last saved config plus its version. Returns +// ErrNotFound when no config has ever been saved. +func (s *Store) LoadConfig() (uint64, []byte, error) { + var version uint64 + var raw []byte + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bktConfig) + v := b.Get(keyConfigCurrent) + if v == nil { + return ErrNotFound + } + pt, err := open(s.aead, v) + if err != nil { + return err + } + raw = pt + if vb := b.Get(keyConfigVersion); vb != nil { + version = bytesToU64(vb) + } + return nil + }) + if err != nil { + return 0, nil, err + } + return version, raw, nil +} + +// --- queue bucket -------------------------------------------------------- + +// EnqueueJob persists a pending job keyed by run_id. Idempotent: re-enqueuing +// the same run_id overwrites the previous payload (matches the spec — jobs +// dedupe by run_id). +func (s *Store) EnqueueJob(runID string, payload []byte) error { + if runID == "" { + return errors.New("state: empty run id") + } + enc, err := seal(s.aead, payload) + if err != nil { + return err + } + return s.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bktQueue).Put([]byte(runID), enc) + }) +} + +// DequeueJobs returns up to n jobs in key order without removing them. +// Use AckJob to drop a job once its delivery is confirmed. +func (s *Store) DequeueJobs(n int) ([]QueuedJob, error) { + if n <= 0 { + return nil, nil + } + out := make([]QueuedJob, 0, n) + err := s.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket(bktQueue).Cursor() + for k, v := c.First(); k != nil && len(out) < n; k, v = c.Next() { + pt, err := open(s.aead, v) + if err != nil { + return err + } + // Copy because bbolt slices are only valid for the txn lifetime. + kc := make([]byte, len(k)) + copy(kc, k) + out = append(out, QueuedJob{RunID: string(kc), Payload: pt}) + } + return nil + }) + if err != nil { + return nil, err + } + return out, nil +} + +// AckJob removes a job from the queue. Safe to call on an unknown run_id. +func (s *Store) AckJob(runID string) error { + return s.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bktQueue).Delete([]byte(runID)) + }) +} + +// QueueDepth returns the current pending job count. O(buckets) — cheap. +func (s *Store) QueueDepth() (int, error) { + var n int + err := s.db.View(func(tx *bolt.Tx) error { + n = tx.Bucket(bktQueue).Stats().KeyN + return nil + }) + return n, err +} + +// --- registry bucket ----------------------------------------------------- + +// SaveSession persists the session_id assigned by the server in RegisterAck. +func (s *Store) SaveSession(sessionID string, serverTimeMs int64) error { + enc, err := seal(s.aead, []byte(sessionID)) + if err != nil { + return err + } + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bktRegistry) + if err := b.Put(keySessionID, enc); err != nil { + return err + } + return b.Put(keyServerTime, u64Bytes(uint64(serverTimeMs))) + }) +} + +// LoadSession returns the last known session_id, or ErrNotFound. +func (s *Store) LoadSession() (string, error) { + var sid string + err := s.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(bktRegistry).Get(keySessionID) + if v == nil { + return ErrNotFound + } + pt, err := open(s.aead, v) + if err != nil { + return err + } + sid = string(pt) + return nil + }) + if err != nil { + return "", err + } + return sid, nil +} + +// RecordHeartbeat writes the wall-clock time of the last successful heartbeat. +func (s *Store) RecordHeartbeat(tsMs int64) error { + return s.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bktRegistry).Put(keyHeartbeat, u64Bytes(uint64(tsMs))) + }) +} + +// LastHeartbeat returns the timestamp written by RecordHeartbeat, or 0. +func (s *Store) LastHeartbeat() (int64, error) { + var ts int64 + err := s.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(bktRegistry).Get(keyHeartbeat) + if v != nil { + ts = int64(bytesToU64(v)) + } + return nil + }) + return ts, err +} + +// --- logs buffer --------------------------------------------------------- + +// BufferLog appends a log payload keyed by timestamp+ordinal so iteration +// returns chronological order. The key encodes ts_ms (big-endian) so bbolt +// sorts naturally. +func (s *Store) BufferLog(tsMs int64, payload []byte) error { + enc, err := seal(s.aead, payload) + if err != nil { + return err + } + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bktLogs) + // Sequence number ensures uniqueness when ts collides. + seq, _ := b.NextSequence() + key := make([]byte, 8+8) + binary.BigEndian.PutUint64(key[:8], uint64(tsMs)) + binary.BigEndian.PutUint64(key[8:], seq) + return b.Put(key, enc) + }) +} + +// DrainLogs returns up to n buffered log payloads in chronological order +// and removes them from the buffer in the same transaction. +func (s *Store) DrainLogs(n int) ([][]byte, error) { + if n <= 0 { + return nil, nil + } + out := make([][]byte, 0, n) + err := s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bktLogs) + c := b.Cursor() + var keys [][]byte + for k, v := c.First(); k != nil && len(out) < n; k, v = c.Next() { + pt, err := open(s.aead, v) + if err != nil { + return err + } + out = append(out, pt) + kc := make([]byte, len(k)) + copy(kc, k) + keys = append(keys, kc) + } + for _, k := range keys { + if err := b.Delete(k); err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err + } + return out, nil +} + +// --- helpers -------------------------------------------------------------- + +func u64Bytes(n uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, n) + return b +} + +func bytesToU64(b []byte) uint64 { + if len(b) != 8 { + return 0 + } + return binary.BigEndian.Uint64(b) +} diff --git a/apps/agent/internal/state/state_test.go b/apps/agent/internal/state/state_test.go new file mode 100644 index 0000000..89012d2 --- /dev/null +++ b/apps/agent/internal/state/state_test.go @@ -0,0 +1,168 @@ +package state + +import ( + "bytes" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +const testAgentKey = "bkpy_test_abcdefghijklmnopqrstuvwxyz012345" + +func newStore(t *testing.T) (*Store, string) { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "state.db") + s, err := Open(path, Options{AgentKey: testAgentKey}) + require.NoError(t, err) + t.Cleanup(func() { _ = s.Close() }) + return s, path +} + +func TestOpen_Empty(t *testing.T) { + s, _ := newStore(t) + _, _, err := s.LoadConfig() + require.ErrorIs(t, err, ErrNotFound) + + _, err = s.LoadSession() + require.ErrorIs(t, err, ErrNotFound) + + n, err := s.QueueDepth() + require.NoError(t, err) + require.Equal(t, 0, n) +} + +func TestConfig_Roundtrip(t *testing.T) { + s, _ := newStore(t) + payload := []byte("hello protobuf bytes") + require.NoError(t, s.SaveConfig(42, payload)) + + v, got, err := s.LoadConfig() + require.NoError(t, err) + require.Equal(t, uint64(42), v) + require.Equal(t, payload, got) +} + +func TestQueue_EnqueueDequeueAck(t *testing.T) { + s, _ := newStore(t) + + require.NoError(t, s.EnqueueJob("run-a", []byte("aaa"))) + require.NoError(t, s.EnqueueJob("run-b", []byte("bbb"))) + require.NoError(t, s.EnqueueJob("run-c", []byte("ccc"))) + + n, err := s.QueueDepth() + require.NoError(t, err) + require.Equal(t, 3, n) + + jobs, err := s.DequeueJobs(2) + require.NoError(t, err) + require.Len(t, jobs, 2) + require.Equal(t, "run-a", jobs[0].RunID) + require.Equal(t, []byte("aaa"), jobs[0].Payload) + require.Equal(t, "run-b", jobs[1].RunID) + + // Ack pops the head; depth drops. + require.NoError(t, s.AckJob("run-a")) + n, _ = s.QueueDepth() + require.Equal(t, 2, n) + + // Idempotent re-enqueue overwrites payload. + require.NoError(t, s.EnqueueJob("run-b", []byte("BBB"))) + jobs, _ = s.DequeueJobs(10) + require.Equal(t, []byte("BBB"), jobs[0].Payload) +} + +func TestSession(t *testing.T) { + s, _ := newStore(t) + require.NoError(t, s.SaveSession("sess-123", 1700000000000)) + got, err := s.LoadSession() + require.NoError(t, err) + require.Equal(t, "sess-123", got) +} + +func TestHeartbeat(t *testing.T) { + s, _ := newStore(t) + ts, err := s.LastHeartbeat() + require.NoError(t, err) + require.Equal(t, int64(0), ts) + + require.NoError(t, s.RecordHeartbeat(1700000000000)) + ts, err = s.LastHeartbeat() + require.NoError(t, err) + require.Equal(t, int64(1700000000000), ts) +} + +func TestLogs_BufferAndDrain(t *testing.T) { + s, _ := newStore(t) + require.NoError(t, s.BufferLog(100, []byte("first"))) + require.NoError(t, s.BufferLog(200, []byte("second"))) + require.NoError(t, s.BufferLog(150, []byte("middle"))) + + drained, err := s.DrainLogs(10) + require.NoError(t, err) + require.Len(t, drained, 3) + // Chronological order — ts_ms wins, so: 100, 150, 200. + require.Equal(t, []byte("first"), drained[0]) + require.Equal(t, []byte("middle"), drained[1]) + require.Equal(t, []byte("second"), drained[2]) + + // Second drain returns nothing. + drained, err = s.DrainLogs(10) + require.NoError(t, err) + require.Empty(t, drained) +} + +// TestEncryption_AtRest ensures values written to disk are NOT plaintext. +// This is the headline guarantee of the state package and the basis for +// task D-17 ("local state encryption"). +func TestEncryption_AtRest(t *testing.T) { + s, path := newStore(t) + plaintext := []byte("super-secret-config-payload") + require.NoError(t, s.SaveConfig(1, plaintext)) + require.NoError(t, s.EnqueueJob("run-x", []byte("queued-secret-x"))) + + // Close to flush before reading raw. + require.NoError(t, s.Close()) + + raw, err := os.ReadFile(path) + require.NoError(t, err) + require.False(t, bytes.Contains(raw, plaintext), "plaintext config leaked to disk") + require.False(t, bytes.Contains(raw, []byte("queued-secret-x")), "plaintext queue payload leaked to disk") +} + +// TestEncryption_WrongKey ensures a different agent key cannot decrypt +// existing state — corruption / key rotation surfaces as a hard error +// rather than silently overwriting good data. +func TestEncryption_WrongKey(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "state.db") + + s, err := Open(path, Options{AgentKey: testAgentKey}) + require.NoError(t, err) + require.NoError(t, s.SaveConfig(7, []byte("payload"))) + require.NoError(t, s.Close()) + + // Reopen with a different key — must fail to decrypt. + wrong := "bkpy_test_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ" + s2, err := Open(path, Options{AgentKey: wrong}) + require.NoError(t, err) + defer s2.Close() + + _, _, err = s2.LoadConfig() + require.Error(t, err) + require.True(t, errors.Is(err, errCipher) || err.Error() != "", "expected cipher error, got: %v", err) +} + +func TestOpen_EmptyKey(t *testing.T) { + dir := t.TempDir() + _, err := Open(filepath.Join(dir, "state.db"), Options{AgentKey: ""}) + require.Error(t, err) +} + +func TestEnqueue_EmptyRunID(t *testing.T) { + s, _ := newStore(t) + require.Error(t, s.EnqueueJob("", []byte("x"))) +}