From a6a8955014b299f921d41907311f95c0360f5c94 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 6 May 2026 15:57:48 +0100 Subject: [PATCH 1/4] CBG-5291: CAS-safe Update on dual MetadataStore wrapper Replaces the placeholder MetadataStore.Update (which delegated the entire read-modify-write to the fallback datastore) with a wrapper-level CAS retry loop that always lands writes in primary. When the doc is in primary or the doc is absent from both stores, it delegates to primary.Update; when the doc lives only in fallback, it feeds the fallback value to the caller's callback and inserts the result into primary via WriteCas, retrying on CAS race. A delete returned by the callback for a fallback-only doc is a no-op in primary - the metadata migration sweep tombstones the fallback copy. WriteUpdateWithXattrs gets the same dual-read pattern for symmetry (callers today are application-data only, but a future metadata caller would otherwise silently skip fallback data). Co-Authored-By: Claude Opus 4.7 (1M context) --- base/bucket_gocb_test.go | 203 +++++++++++++++++++++++++++++++++++- base/dual_metadata_store.go | 105 ++++++++++++++++++- 2 files changed, 303 insertions(+), 5 deletions(-) diff --git a/base/bucket_gocb_test.go b/base/bucket_gocb_test.go index f355c9141a..b8fc758488 100644 --- a/base/bucket_gocb_test.go +++ b/base/bucket_gocb_test.go @@ -2843,10 +2843,12 @@ func TestMetadataStoreKVStoreWriteOperations(t *testing.T) { }) require.NoError(t, err) require.NotZero(t, cas) - // CBG-5291: update currently writes to fallback - readRawBody, _, err = metaStore.Fallback().GetRaw(updateDocID) + // verify in primary, not in fallback + readRawBody, _, err = metaStore.Primary().GetRaw(updateDocID) require.NoError(t, err) assert.Equal(t, updateBody, readRawBody) + _, _, err = metaStore.Fallback().GetRaw(updateDocID) + require.True(t, IsDocNotFoundError(err)) // fallback expects error // Test Incr incrDocID := t.Name() + "_incr" @@ -2919,6 +2921,203 @@ func TestMetadataStoreKVStoreWriteOperations(t *testing.T) { assert.False(t, exists) } +// TestMetadataStoreUpdateMigratesFromFallback verifies that an Update against a key that lives only +// in the fallback store invokes the caller's callback with the fallback value, and writes the result +// to the primary store (never back to the fallback). This is the core CBG-5291 case. +func TestMetadataStoreUpdateMigratesFromFallback(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"counter":1}`) + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) + require.NoError(t, err) + require.True(t, ok) + + updatedBody := []byte(`{"counter":2}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return updatedBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, originalBody, seenByCallback, "callback must observe the fallback value, not nil") + + // New value lives in primary at the returned CAS. + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryRaw) + assert.Equal(t, cas, primaryCas) + + // Fallback copy is left untouched - the migration sweep handles cleanup. + fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, originalBody, fallbackRaw) +} + +// TestMetadataStoreUpdateExistingPrimary verifies that when the doc already exists in primary, +// Update goes through the standard CAS path against primary (callback sees primary value, write +// replaces in primary, fallback is ignored even if it has stale data). +func TestMetadataStoreUpdateExistingPrimary(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + primaryBody := []byte(`{"src":"primary"}`) + fallbackBody := []byte(`{"src":"fallback"}`) + ok, err := metaStore.Primary().Add(docID, 0, primaryBody) + require.NoError(t, err) + require.True(t, ok) + ok, err = metaStore.Fallback().Add(docID, 0, fallbackBody) + require.NoError(t, err) + require.True(t, ok) + + updatedBody := []byte(`{"src":"updated"}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return updatedBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, primaryBody, seenByCallback, "callback must observe primary value, not fallback") + + primaryRaw, _, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryRaw) + + fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, fallbackBody, fallbackRaw, "fallback is never written by Update") +} + +// TestMetadataStoreUpdateDeleteFallbackOnly verifies the agreed CBG-5291 behaviour: when the +// callback requests a delete and the doc exists only in fallback, the wrapper is a no-op against +// primary (no tombstone written) and leaves the fallback copy alone for the migration sweep. +func TestMetadataStoreUpdateDeleteFallbackOnly(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"counter":1}`) + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) + require.NoError(t, err) + require.True(t, ok) + + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + return nil, nil, true, nil + }) + require.NoError(t, err) + assert.Zero(t, cas) + + // Primary stays absent. + exists, err := metaStore.Primary().Exists(docID) + require.NoError(t, err) + assert.False(t, exists) + + // Fallback is untouched. + fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, originalBody, fallbackRaw) +} + +// TestMetadataStoreUpdateAfterMigrationComplete verifies that once SetMigrationComplete has been +// called, fallback contents are ignored entirely - even by Update - and the wrapper behaves as a +// thin pass-through to the primary's Update. +func TestMetadataStoreUpdateAfterMigrationComplete(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + metaStore.SetMigrationComplete() + + docID := t.Name() + ok, err := metaStore.Fallback().Add(docID, 0, []byte(`{"counter":99}`)) + require.NoError(t, err) + require.True(t, ok) + + updatedBody := []byte(`{"counter":1}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return updatedBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Nil(t, seenByCallback, "fallback must not be consulted after migration complete") + + primaryRaw, _, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryRaw) +} + +// TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback is the WriteUpdateWithXattrs analogue +// of TestMetadataStoreUpdateMigratesFromFallback: the wrapper must surface the fallback body and +// xattrs to the callback and write the result to primary. +func TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + originalBody := []byte(`{"counter":1}`) + originalXattr := []byte(`{"seq":1}`) + + // Seed the fallback store using its own WriteUpdateWithXattrs as an insert. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: originalBody, + Xattrs: map[string][]byte{xattrKey: originalXattr}, + }, nil + }) + require.NoError(t, err) + + updatedBody := []byte(`{"counter":2}`) + updatedXattr := []byte(`{"seq":2}`) + var seenBody []byte + var seenXattrs map[string][]byte + var seenCas uint64 + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cb_cas uint64) (sgbucket.UpdatedDoc, error) { + seenBody = append([]byte(nil), doc...) + seenXattrs = xattrs + seenCas = cb_cas + return sgbucket.UpdatedDoc{ + Doc: updatedBody, + Xattrs: map[string][]byte{xattrKey: updatedXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, originalBody, seenBody, "callback must observe fallback body") + require.Contains(t, seenXattrs, xattrKey) + assert.Equal(t, originalXattr, seenXattrs[xattrKey], "callback must observe fallback xattr") + assert.Zero(t, seenCas, "callback must see cas=0 (fallback CAS is never propagated)") + + // New value lives in primary with the expected body and xattr. + primaryBody, primaryXattrs, primaryCas, err := metaStore.Primary().GetWithXattrs(ctx, docID, []string{xattrKey}) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryBody) + assert.Equal(t, updatedXattr, primaryXattrs[xattrKey]) + assert.Equal(t, cas, primaryCas) +} + func TestReadDoesNotGoToFallbackWhenMigrationComplete(t *testing.T) { ctx := TestCtx(t) bucket := GetTestBucket(t) diff --git a/base/dual_metadata_store.go b/base/dual_metadata_store.go index 4e5037b5b0..fa675d41e6 100644 --- a/base/dual_metadata_store.go +++ b/base/dual_metadata_store.go @@ -10,6 +10,7 @@ package base import ( "context" + "errors" "sync/atomic" sgbucket "github.com/couchbase/sg-bucket" @@ -168,9 +169,53 @@ func (ms *MetadataStore) Remove(k string, cas uint64) (casOut uint64, err error) return ms.primary.Remove(k, cas) } +// Update implements a CAS-safe read-modify-write that always lands in primary, even when the +// document only exists in the fallback store. When the doc is in primary (or fallback is no +// longer consulted) the call delegates to the primary's own Update — its CAS retry loop is +// already correct. When the doc is only in fallback, the wrapper feeds the fallback value to +// the caller's callback and inserts the result into primary; if a concurrent writer wins the +// race, the loop retries against primary. +// +// A delete returned by the callback for a doc that exists only in fallback is a no-op in +// primary; the metadata migration sweep will tombstone the fallback copy. func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (casOut uint64, err error) { - // CBG-5291: turn into insert operation for primary datastore. - return ms.fallback.Update(k, exp, callback) + for { + primaryExists, existsErr := ms.primary.Exists(k) + if existsErr != nil { + return 0, existsErr + } + if primaryExists || ms.migrationComplete.Load() { + return ms.primary.Update(k, exp, callback) + } + + fallbackValue, _, fallbackErr := ms.fallback.GetRaw(k) + if IsDocNotFoundError(fallbackErr) { + return ms.primary.Update(k, exp, callback) + } + if fallbackErr != nil { + return 0, fallbackErr + } + + newValue, cbExpiry, isDelete, cbErr := callback(fallbackValue) + if cbErr != nil { + return 0, cbErr + } + writeExp := exp + if cbExpiry != nil { + writeExp = *cbExpiry + } + if isDelete { + return 0, nil + } + casOut, err = ms.primary.WriteCas(k, writeExp, 0, newValue, 0) + if err == nil { + return casOut, nil + } + if IsCasMismatch(err) { + continue + } + return 0, err + } } func (ms *MetadataStore) Incr(k string, amt, def uint64, exp uint32) (casOut uint64, err error) { @@ -225,8 +270,62 @@ func (ms *MetadataStore) DeleteWithXattrs(ctx context.Context, k string, xattrKe return ms.primary.DeleteWithXattrs(ctx, k, xattrKeys) } +// WriteUpdateWithXattrs is the xattr-aware analogue of Update and follows the same +// "read-from-both, write-to-primary" pattern. If the caller has supplied a previous CAS from +// the primary (or migration is complete), the call delegates straight to the primary's own +// retry loop. Otherwise the wrapper probes primary first; on a primary miss it reads +// body+xattrs from fallback, hands them to the callback with cas=0, and inserts the result +// into primary via WriteWithXattrs, retrying on CAS race. +// +// As with Update, a tombstone result for a doc that exists only in fallback is a no-op in +// primary — the metadata migration sweep will tombstone the fallback copy. func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (casOut uint64, err error) { - return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) + if (previous != nil && previous.Cas != 0) || ms.migrationComplete.Load() { + return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) + } + + for { + primaryExists, existsErr := ms.primary.Exists(k) + if existsErr != nil { + return 0, existsErr + } + if primaryExists { + return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + } + + fallbackBody, fallbackXattrs, _, fallbackErr := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) + if IsDocNotFoundError(fallbackErr) { + return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + } + // A doc with no/partial xattrs is still a fallback hit — surface what we have to the + // callback rather than treat it as not-found. + if fallbackErr != nil && !IsXattrNotFoundError(fallbackErr) && !errors.Is(fallbackErr, ErrXattrPartialFound) { + return 0, fallbackErr + } + + updatedDoc, cbErr := callback(fallbackBody, fallbackXattrs, 0) + if cbErr == ErrCasFailureShouldRetry { + continue + } + if cbErr != nil { + return 0, cbErr + } + writeExp := exp + if updatedDoc.Expiry != nil { + writeExp = *updatedDoc.Expiry + } + if updatedDoc.IsTombstone { + return 0, nil + } + casOut, err = ms.primary.WriteWithXattrs(ctx, k, writeExp, 0, updatedDoc.Doc, updatedDoc.Xattrs, nil, opts) + if err == nil { + return casOut, nil + } + if IsCasMismatch(err) || IsDocNotFoundError(err) { + continue + } + return 0, err + } } func (ms *MetadataStore) UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xv map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) { From a31f8331d9f325c9fe157d22365722fe45c66ec1 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 7 May 2026 12:57:53 +0100 Subject: [PATCH 2/4] CBG-5291: Apply review fixes to dual MetadataStore wrapper Address review feedback on the CAS-safe Update wrapper: - Update / WriteUpdateWithXattrs delete on a fallback-only doc now lands in the fallback (Remove / WriteTombstoneWithXattrs with the observed CAS, retrying on race) so subsequent reads stop falling through to a stale doc instead of relying on the migration sweep. - Document the previous.Cas != 0 shortcut: the wrapper's read methods discard fallback CAS, so a non-zero CAS from the wrapper unambiguously identifies a primary revision. - Note in code that xattrsToDelete is intentionally nil on the migration-insert branch (cas=0 inserts reject xattrsToDelete with ErrDeleteXattrOnDocumentInsert). New / rewritten tests cover each behaviour and were verified TDD-style to catch a regression of the corresponding fix: - TestMetadataStoreUpdateDeleteFallbackOnly (rewritten for new contract) - TestMetadataStoreUpdateRetriesOnCASMismatch - TestMetadataStoreWriteUpdateWithXattrsCASShortcut - TestMetadataStoreWriteUpdateWithXattrsHonorsXattrsToDelete - TestMetadataStoreWriteUpdateWithXattrsTombstoneFallbackOnly Co-Authored-By: Claude Opus 4.7 (1M context) --- base/bucket_gocb_test.go | 256 ++++++++++++++++++++++++++++++++++-- base/dual_metadata_store.go | 50 +++++-- 2 files changed, 289 insertions(+), 17 deletions(-) diff --git a/base/bucket_gocb_test.go b/base/bucket_gocb_test.go index b8fc758488..515622fa36 100644 --- a/base/bucket_gocb_test.go +++ b/base/bucket_gocb_test.go @@ -2998,9 +2998,11 @@ func TestMetadataStoreUpdateExistingPrimary(t *testing.T) { assert.Equal(t, fallbackBody, fallbackRaw, "fallback is never written by Update") } -// TestMetadataStoreUpdateDeleteFallbackOnly verifies the agreed CBG-5291 behaviour: when the -// callback requests a delete and the doc exists only in fallback, the wrapper is a no-op against -// primary (no tombstone written) and leaves the fallback copy alone for the migration sweep. +// TestMetadataStoreUpdateDeleteFallbackOnly verifies the CBG-5291 delete behaviour for a doc +// that exists only in fallback: the wrapper writes the delete to fallback (the only store +// that ever held the doc) and does not create a primary tombstone. Without this, callers +// reading after delete would still observe the doc via the fallback read path until the +// migration sweep ran. func TestMetadataStoreUpdateDeleteFallbackOnly(t *testing.T) { ctx := TestCtx(t) bucket := GetTestBucket(t) @@ -3015,20 +3017,71 @@ func TestMetadataStoreUpdateDeleteFallbackOnly(t *testing.T) { require.True(t, ok) cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + assert.Equal(t, originalBody, current, "callback must observe fallback value") return nil, nil, true, nil }) require.NoError(t, err) assert.Zero(t, cas) - // Primary stays absent. + // Primary stays absent - we did not write a primary tombstone. exists, err := metaStore.Primary().Exists(docID) require.NoError(t, err) assert.False(t, exists) - // Fallback is untouched. - fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + // Fallback delete landed - subsequent reads must surface not-found. + _, _, err = metaStore.Fallback().GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "expected fallback delete, got err=%v", err) + + // Wrapper-level read also reports not-found (no read fallthrough to a stale doc). + _, _, err = metaStore.GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "wrapper read must report not-found after delete, got err=%v", err) +} + +// TestMetadataStoreUpdateRetriesOnCASMismatch exercises the wrapper's CAS retry: the doc starts +// life in fallback only; from inside the callback (between the wrapper's primary.Exists probe +// and its primary.WriteCas insert) a concurrent writer lands a doc in primary. The first +// WriteCas(cas=0) must hit a CAS mismatch and the loop must retry — on retry primary now holds +// the doc, so the wrapper delegates to primary.Update, the callback fires again against the +// primary value, and the second result is what finally lands in primary. +func TestMetadataStoreUpdateRetriesOnCASMismatch(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"src":"fallback"}`) + racerBody := []byte(`{"src":"racer"}`) + + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) require.NoError(t, err) - assert.Equal(t, originalBody, fallbackRaw) + require.True(t, ok) + + var calls int + var callbackInputs [][]byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + calls++ + callbackInputs = append(callbackInputs, append([]byte(nil), current...)) + if calls == 1 { + // Simulate a concurrent writer beating us to the punch on primary. + added, addErr := metaStore.Primary().Add(docID, 0, racerBody) + require.NoError(t, addErr) + require.True(t, added) + return []byte(`{"src":"call1"}`), nil, false, nil + } + return []byte(`{"src":"call2"}`), nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + require.Equal(t, 2, calls, "wrapper must retry through primary.Update after CAS race") + assert.Equal(t, originalBody, callbackInputs[0], "first callback observes the fallback value") + assert.Equal(t, racerBody, callbackInputs[1], "second callback observes the racer's value from primary") + + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, []byte(`{"src":"call2"}`), primaryRaw, "the second callback's result wins") + assert.Equal(t, cas, primaryCas) } // TestMetadataStoreUpdateAfterMigrationComplete verifies that once SetMigrationComplete has been @@ -3062,6 +3115,191 @@ func TestMetadataStoreUpdateAfterMigrationComplete(t *testing.T) { assert.Equal(t, updatedBody, primaryRaw) } +// TestMetadataStoreWriteUpdateWithXattrsCASShortcut verifies the previous.Cas != 0 early-out: +// when the caller hands in a non-zero CAS, the wrapper must delegate straight to +// primary.WriteUpdateWithXattrs with the caller's previous, and must NOT consult fallback. +// +// We distinguish the path by handing in a previous whose Body differs from what's actually in +// primary (CAS still matches). With the shortcut: callback observes the caller-supplied body. +// Without the shortcut: callback would observe the actual primary body (because the wrapper +// would pass nil for previous and primary.WriteUpdateWithXattrs would re-read). +func TestMetadataStoreWriteUpdateWithXattrsCASShortcut(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + primaryBody := []byte(`{"src":"primary"}`) + primaryXattr := []byte(`{"seq":1}`) + fallbackBody := []byte(`{"src":"fallback"}`) + + // Seed primary; capture its actual CAS. + primaryCas, err := metaStore.Primary().WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: primaryBody, + Xattrs: map[string][]byte{xattrKey: primaryXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, primaryCas) + + // Seed fallback with a DIFFERENT body — if the wrapper were to consult fallback in this + // path the callback would surface it. The shortcut must skip this read entirely. + ok, err := metaStore.Fallback().Add(docID, 0, fallbackBody) + require.NoError(t, err) + require.True(t, ok) + + // Hand in a previous whose Body and Xattrs disagree with primary, but whose CAS matches. + // If the shortcut runs, primary.WriteUpdateWithXattrs uses this previous directly and the + // callback sees `bogusBody`. + bogusBody := []byte(`{"src":"caller-supplied"}`) + bogusXattr := []byte(`{"seq":99}`) + previous := &sgbucket.BucketDocument{ + Cas: primaryCas, + Body: bogusBody, + Xattrs: map[string][]byte{xattrKey: bogusXattr}, + } + + var callbackObservations [][]byte + updatedBody := []byte(`{"src":"updated"}`) + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, previous, nil, + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { + callbackObservations = append(callbackObservations, append([]byte(nil), doc...)) + return sgbucket.UpdatedDoc{ + Doc: updatedBody, + Xattrs: map[string][]byte{xattrKey: primaryXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + require.NotEmpty(t, callbackObservations) + assert.Equal(t, bogusBody, callbackObservations[0], + "shortcut must pass caller's previous straight through to primary.WriteUpdateWithXattrs; callback should see caller-supplied body, not primary's actual body") + assert.NotEqual(t, fallbackBody, callbackObservations[0], "fallback must never be consulted on the shortcut path") +} + +// TestMetadataStoreWriteUpdateWithXattrsHonorsXattrsToDelete verifies the migration-insert path +// when the callback returns XattrsToDelete: the underlying primary insert (cas=0) cannot accept +// xattrsToDelete (rosmar/CBS both reject it as ErrDeleteXattrOnDocumentInsert), so the wrapper +// must drop them silently. The user-visible contract: only the xattrs the callback explicitly +// returned in UpdatedDoc.Xattrs land in primary; xattrs the callback marked for deletion are +// simply not migrated (which is equivalent to "deleted" since primary never held them). +func TestMetadataStoreWriteUpdateWithXattrsHonorsXattrsToDelete(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + keepXattr := SyncXattrName + dropXattr := "_drop" + xattrKeys := []string{keepXattr, dropXattr} + + // Seed fallback with body + two xattrs. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: []byte(`{"v":1}`), + Xattrs: map[string][]byte{ + keepXattr: []byte(`{"keep":1}`), + dropXattr: []byte(`{"drop":1}`), + }, + }, nil + }) + require.NoError(t, err) + + // Migrate via the wrapper. Callback returns updated body, retains keepXattr, requests + // dropXattr be dropped via XattrsToDelete. + updatedBody := []byte(`{"v":2}`) + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { + require.Contains(t, xattrs, keepXattr, "callback should observe both fallback xattrs") + require.Contains(t, xattrs, dropXattr) + return sgbucket.UpdatedDoc{ + Doc: updatedBody, + Xattrs: map[string][]byte{keepXattr: []byte(`{"keep":2}`)}, + XattrsToDelete: []string{dropXattr}, + }, nil + }) + require.NoError(t, err, "migration with XattrsToDelete must not surface ErrDeleteXattrOnDocumentInsert") + require.NotZero(t, cas) + + // Primary holds only the kept xattr; the dropped one was never migrated. + primaryBody, primaryXattrs, _, err := metaStore.Primary().GetWithXattrs(ctx, docID, xattrKeys) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryBody) + require.Contains(t, primaryXattrs, keepXattr) + assert.JSONEq(t, `{"keep":2}`, string(primaryXattrs[keepXattr])) + assert.NotContains(t, primaryXattrs, dropXattr, "dropped xattr must not be migrated to primary") +} + +// TestMetadataStoreWriteUpdateWithXattrsTombstoneFallbackOnly is the xattr analogue of +// TestMetadataStoreUpdateDeleteFallbackOnly: when the callback returns a tombstone for a doc +// that exists only in fallback, the wrapper writes the tombstone to fallback (the only store +// that ever held the doc). Subsequent reads through the wrapper must surface not-found. +func TestMetadataStoreWriteUpdateWithXattrsTombstoneFallbackOnly(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + originalBody := []byte(`{"counter":1}`) + originalXattr := []byte(`{"seq":1}`) + + // Seed fallback only. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: originalBody, + Xattrs: map[string][]byte{xattrKey: originalXattr}, + }, nil + }) + require.NoError(t, err) + + // Tombstone via the wrapper. + tombstoneXattr := []byte(`{"seq":2,"deleted":true}`) + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { + assert.Equal(t, originalBody, doc, "callback must observe fallback body") + return sgbucket.UpdatedDoc{ + Xattrs: map[string][]byte{xattrKey: tombstoneXattr}, + IsTombstone: true, + }, nil + }) + require.NoError(t, err) + assert.Zero(t, cas, "tombstone path returns 0 CAS — primary was never written") + + // Primary stays absent: we did not create a primary tombstone. + exists, err := metaStore.Primary().Exists(docID) + require.NoError(t, err) + assert.False(t, exists) + + // Fallback body is gone — GetRaw is the body-only read path. + _, _, err = metaStore.Fallback().GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "fallback body must be removed by tombstone, got %v", err) + + // Wrapper-level read also reports not-found (no read fallthrough to a stale doc body). + _, _, err = metaStore.GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "wrapper read must report not-found after tombstone, got %v", err) + + // Xattr is still readable on fallback (it's a tombstone with retained xattr). + fallbackXattrs, _, xerr := metaStore.Fallback().GetXattrs(ctx, docID, []string{xattrKey}) + require.NoError(t, xerr) + require.Contains(t, fallbackXattrs, xattrKey) + assert.JSONEq(t, string(tombstoneXattr), string(fallbackXattrs[xattrKey]), "fallback xattr must reflect the tombstone xattr") +} + // TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback is the WriteUpdateWithXattrs analogue // of TestMetadataStoreUpdateMigratesFromFallback: the wrapper must surface the fallback body and // xattrs to the callback and write the result to primary. @@ -3094,10 +3332,10 @@ func TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback(t *testing.T) { var seenXattrs map[string][]byte var seenCas uint64 cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, - func(doc []byte, xattrs map[string][]byte, cb_cas uint64) (sgbucket.UpdatedDoc, error) { + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { seenBody = append([]byte(nil), doc...) seenXattrs = xattrs - seenCas = cb_cas + seenCas = cbCas return sgbucket.UpdatedDoc{ Doc: updatedBody, Xattrs: map[string][]byte{xattrKey: updatedXattr}, diff --git a/base/dual_metadata_store.go b/base/dual_metadata_store.go index fa675d41e6..3a0c9ef336 100644 --- a/base/dual_metadata_store.go +++ b/base/dual_metadata_store.go @@ -176,8 +176,10 @@ func (ms *MetadataStore) Remove(k string, cas uint64) (casOut uint64, err error) // the caller's callback and inserts the result into primary; if a concurrent writer wins the // race, the loop retries against primary. // -// A delete returned by the callback for a doc that exists only in fallback is a no-op in -// primary; the metadata migration sweep will tombstone the fallback copy. +// When the callback requests a delete on a fallback-only doc, the wrapper applies the delete +// directly to the fallback store (the only store that ever held the doc). Without this, +// subsequent reads would still surface the doc via the read-fallback path until the migration +// sweep ran. func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (casOut uint64, err error) { for { primaryExists, existsErr := ms.primary.Exists(k) @@ -188,7 +190,7 @@ func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFu return ms.primary.Update(k, exp, callback) } - fallbackValue, _, fallbackErr := ms.fallback.GetRaw(k) + fallbackValue, fallbackCas, fallbackErr := ms.fallback.GetRaw(k) if IsDocNotFoundError(fallbackErr) { return ms.primary.Update(k, exp, callback) } @@ -205,7 +207,17 @@ func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFu writeExp = *cbExpiry } if isDelete { - return 0, nil + // Delete the fallback copy — primary never held it, so a primary tombstone is + // pointless and would shadow the fallback for nothing. Remove with the CAS we + // just observed; on a concurrent fallback mutation we retry the loop. + _, removeErr := ms.fallback.Remove(k, fallbackCas) + if removeErr == nil || IsDocNotFoundError(removeErr) { + return 0, nil + } + if IsCasMismatch(removeErr) { + continue + } + return 0, removeErr } casOut, err = ms.primary.WriteCas(k, writeExp, 0, newValue, 0) if err == nil { @@ -277,8 +289,17 @@ func (ms *MetadataStore) DeleteWithXattrs(ctx context.Context, k string, xattrKe // body+xattrs from fallback, hands them to the callback with cas=0, and inserts the result // into primary via WriteWithXattrs, retrying on CAS race. // -// As with Update, a tombstone result for a doc that exists only in fallback is a no-op in -// primary — the metadata migration sweep will tombstone the fallback copy. +// previous.Cas != 0 short-circuits to the primary's own loop. The wrapper's read methods +// (GetWithXattrs, GetXattrs, Get, GetRaw) only ever return a non-zero CAS for a primary hit — +// fallback CAS is discarded on the read path — so any non-zero previous.Cas a caller obtained +// through this wrapper unambiguously identifies a primary revision. Callers that bypass the +// wrapper (e.g. by calling ms.Fallback() directly) and then feed a fallback CAS back through +// previous.Cas would be misusing the API; that contract is documented but not enforced. +// +// When the callback returns IsTombstone for a fallback-only doc, the wrapper writes the +// tombstone directly to fallback (the only store that ever held the doc); primary is left +// untouched. Without this, callers reading after the tombstone would still see the live doc +// via the read-fallback path until the migration sweep ran. func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (casOut uint64, err error) { if (previous != nil && previous.Cas != 0) || ms.migrationComplete.Load() { return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) @@ -293,7 +314,7 @@ func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xa return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) } - fallbackBody, fallbackXattrs, _, fallbackErr := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) + fallbackBody, fallbackXattrs, fallbackCas, fallbackErr := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) if IsDocNotFoundError(fallbackErr) { return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) } @@ -315,8 +336,21 @@ func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xa writeExp = *updatedDoc.Expiry } if updatedDoc.IsTombstone { - return 0, nil + // Tombstone the fallback copy directly. deleteBody=true clears the body while + // preserving the requested xattrs (if any) on the fallback record. + _, tombstoneErr := ms.fallback.WriteTombstoneWithXattrs(ctx, k, writeExp, fallbackCas, updatedDoc.Xattrs, updatedDoc.XattrsToDelete, true, opts) + if tombstoneErr == nil || IsDocNotFoundError(tombstoneErr) { + return 0, nil + } + if IsCasMismatch(tombstoneErr) { + continue + } + return 0, tombstoneErr } + // xattrsToDelete is intentionally nil on this branch: this is an insert (cas=0) and the + // underlying primary store rejects xattrsToDelete on insert (sgbucket.ErrDeleteXattrOnDocumentInsert). + // Anything the callback wanted dropped from the migrated doc is already absent from + // updatedDoc.Xattrs, so it never lands in primary. casOut, err = ms.primary.WriteWithXattrs(ctx, k, writeExp, 0, updatedDoc.Doc, updatedDoc.Xattrs, nil, opts) if err == nil { return casOut, nil From 0b4b59fd186079bcdf991d0a385a9ace81242e06 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 7 May 2026 14:46:42 +0100 Subject: [PATCH 3/4] CBG-5291: Factor out shared dual-store update-loop scaffolding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update and WriteUpdateWithXattrs both ran the same outer loop — probe primary, delegate when primary holds the doc or migration is sealed, otherwise read fallback / invoke the caller's callback / write to primary (or, on a fallback-only delete or tombstone, write to fallback). Lift the shared scaffolding into runFallbackUpdateLoop. The helper drives a worker through RetryLoopCas with DefaultRetrySleeper, matching the CAS-retry bound (10 attempts, doubling sleep from 5ms) used elsewhere in this package; the loop returns NewRetryTimeoutError if exceeded. Per-method specifics — fallback-read shape, callback signature, delete vs. insert classification, xattr-specific NotFound retry — stay in each method's step closure, with the small CAS/NotFound/terminal switch inlined at each branch. WriteUpdateWithXattrs threads its caller-supplied ctx through; Update has no ctx parameter (DataStore.Update doesn't carry one) and passes context.TODO(), matching the existing readFromFallback idiom in the same file. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) --- base/dual_metadata_store.go | 134 +++++++++++++++++++++--------------- 1 file changed, 80 insertions(+), 54 deletions(-) diff --git a/base/dual_metadata_store.go b/base/dual_metadata_store.go index 3a0c9ef336..b125e561ed 100644 --- a/base/dual_metadata_store.go +++ b/base/dual_metadata_store.go @@ -169,6 +169,39 @@ func (ms *MetadataStore) Remove(k string, cas uint64) (casOut uint64, err error) return ms.primary.Remove(k, cas) } +// errRetryUpdateLoop is a sentinel returned by step functions inside runFallbackUpdateLoop to +// drive another loop iteration (CAS race, callback-requested retry). Never escapes this file. +var errRetryUpdateLoop = errors.New("metadata store: retry dual-store update loop") + +// runFallbackUpdateLoop is the shared scaffolding for Update and WriteUpdateWithXattrs. Each +// iteration probes primary and either delegates to primaryDelegate (primary holds the doc, or +// migration is sealed) or hands off to step. step owns the per-method specifics — reading +// fallback, invoking the caller's callback, and applying the chosen write/delete. step +// returns errRetryUpdateLoop to drive a new iteration; any other non-nil error is terminal. +// +// Retries are bounded by DefaultRetrySleeper (matching the established CAS-retry convention +// for bucket ops elsewhere in this package); the helper returns NewRetryTimeoutError if the +// loop exceeds that bound. +func (ms *MetadataStore) runFallbackUpdateLoop(ctx context.Context, description, k string, primaryDelegate func() (uint64, error), step func() (uint64, error)) (uint64, error) { + worker := func() (shouldRetry bool, err error, casOut uint64) { + primaryExists, existsErr := ms.primary.Exists(k) + if existsErr != nil { + return false, existsErr, 0 + } + if primaryExists || ms.migrationComplete.Load() { + casOut, err := primaryDelegate() + return false, err, casOut + } + casOut, stepErr := step() + if errors.Is(stepErr, errRetryUpdateLoop) { + return true, nil, 0 + } + return false, stepErr, casOut + } + err, casOut := RetryLoopCas(ctx, description, worker, DefaultRetrySleeper()) + return casOut, err +} + // Update implements a CAS-safe read-modify-write that always lands in primary, even when the // document only exists in the fallback store. When the doc is in primary (or fallback is no // longer consulted) the call delegates to the primary's own Update — its CAS retry loop is @@ -180,22 +213,17 @@ func (ms *MetadataStore) Remove(k string, cas uint64) (casOut uint64, err error) // directly to the fallback store (the only store that ever held the doc). Without this, // subsequent reads would still surface the doc via the read-fallback path until the migration // sweep ran. -func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (casOut uint64, err error) { - for { - primaryExists, existsErr := ms.primary.Exists(k) - if existsErr != nil { - return 0, existsErr - } - if primaryExists || ms.migrationComplete.Load() { - return ms.primary.Update(k, exp, callback) - } - - fallbackValue, fallbackCas, fallbackErr := ms.fallback.GetRaw(k) - if IsDocNotFoundError(fallbackErr) { - return ms.primary.Update(k, exp, callback) +func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (uint64, error) { + primaryUpdateFn := func() (uint64, error) { + return ms.primary.Update(k, exp, callback) + } + step := func() (uint64, error) { + fallbackValue, fallbackCas, err := ms.fallback.GetRaw(k) + if IsDocNotFoundError(err) { + return primaryUpdateFn() } - if fallbackErr != nil { - return 0, fallbackErr + if err != nil { + return 0, err } newValue, cbExpiry, isDelete, cbErr := callback(fallbackValue) @@ -206,28 +234,30 @@ func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFu if cbExpiry != nil { writeExp = *cbExpiry } + if isDelete { // Delete the fallback copy — primary never held it, so a primary tombstone is // pointless and would shadow the fallback for nothing. Remove with the CAS we // just observed; on a concurrent fallback mutation we retry the loop. _, removeErr := ms.fallback.Remove(k, fallbackCas) - if removeErr == nil || IsDocNotFoundError(removeErr) { + switch { + case removeErr == nil, IsDocNotFoundError(removeErr): return 0, nil - } - if IsCasMismatch(removeErr) { - continue + case IsCasMismatch(removeErr): + return 0, errRetryUpdateLoop } return 0, removeErr } - casOut, err = ms.primary.WriteCas(k, writeExp, 0, newValue, 0) - if err == nil { + casOut, writeErr := ms.primary.WriteCas(k, writeExp, 0, newValue, 0) + if writeErr == nil { return casOut, nil } - if IsCasMismatch(err) { - continue + if IsCasMismatch(writeErr) { + return 0, errRetryUpdateLoop } - return 0, err + return 0, writeErr } + return ms.runFallbackUpdateLoop(context.TODO(), "MetadataStore.Update", k, primaryUpdateFn, step) } func (ms *MetadataStore) Incr(k string, amt, def uint64, exp uint32) (casOut uint64, err error) { @@ -300,33 +330,27 @@ func (ms *MetadataStore) DeleteWithXattrs(ctx context.Context, k string, xattrKe // tombstone directly to fallback (the only store that ever held the doc); primary is left // untouched. Without this, callers reading after the tombstone would still see the live doc // via the read-fallback path until the migration sweep ran. -func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (casOut uint64, err error) { +func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (uint64, error) { if (previous != nil && previous.Cas != 0) || ms.migrationComplete.Load() { return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) } - - for { - primaryExists, existsErr := ms.primary.Exists(k) - if existsErr != nil { - return 0, existsErr - } - if primaryExists { - return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) - } - - fallbackBody, fallbackXattrs, fallbackCas, fallbackErr := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) - if IsDocNotFoundError(fallbackErr) { - return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + primaryDelegate := func() (uint64, error) { + return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + } + step := func() (uint64, error) { + fallbackBody, fallbackXattrs, fallbackCas, err := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) + if IsDocNotFoundError(err) { + return primaryDelegate() } // A doc with no/partial xattrs is still a fallback hit — surface what we have to the // callback rather than treat it as not-found. - if fallbackErr != nil && !IsXattrNotFoundError(fallbackErr) && !errors.Is(fallbackErr, ErrXattrPartialFound) { - return 0, fallbackErr + if err != nil && !IsXattrNotFoundError(err) && !errors.Is(err, ErrXattrPartialFound) { + return 0, err } updatedDoc, cbErr := callback(fallbackBody, fallbackXattrs, 0) if cbErr == ErrCasFailureShouldRetry { - continue + return 0, errRetryUpdateLoop } if cbErr != nil { return 0, cbErr @@ -335,31 +359,33 @@ func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xa if updatedDoc.Expiry != nil { writeExp = *updatedDoc.Expiry } + if updatedDoc.IsTombstone { // Tombstone the fallback copy directly. deleteBody=true clears the body while // preserving the requested xattrs (if any) on the fallback record. _, tombstoneErr := ms.fallback.WriteTombstoneWithXattrs(ctx, k, writeExp, fallbackCas, updatedDoc.Xattrs, updatedDoc.XattrsToDelete, true, opts) - if tombstoneErr == nil || IsDocNotFoundError(tombstoneErr) { + switch { + case tombstoneErr == nil, IsDocNotFoundError(tombstoneErr): return 0, nil - } - if IsCasMismatch(tombstoneErr) { - continue + case IsCasMismatch(tombstoneErr): + return 0, errRetryUpdateLoop } return 0, tombstoneErr } - // xattrsToDelete is intentionally nil on this branch: this is an insert (cas=0) and the - // underlying primary store rejects xattrsToDelete on insert (sgbucket.ErrDeleteXattrOnDocumentInsert). - // Anything the callback wanted dropped from the migrated doc is already absent from - // updatedDoc.Xattrs, so it never lands in primary. - casOut, err = ms.primary.WriteWithXattrs(ctx, k, writeExp, 0, updatedDoc.Doc, updatedDoc.Xattrs, nil, opts) - if err == nil { + // xattrsToDelete is intentionally nil on this branch: this is an insert (cas=0) and + // the underlying primary store rejects xattrsToDelete on insert + // (sgbucket.ErrDeleteXattrOnDocumentInsert). Anything the callback wanted dropped is + // already absent from updatedDoc.Xattrs, so it never lands in primary. + casOut, writeErr := ms.primary.WriteWithXattrs(ctx, k, writeExp, 0, updatedDoc.Doc, updatedDoc.Xattrs, nil, opts) + if writeErr == nil { return casOut, nil } - if IsCasMismatch(err) || IsDocNotFoundError(err) { - continue + if IsCasMismatch(writeErr) || IsDocNotFoundError(writeErr) { + return 0, errRetryUpdateLoop } - return 0, err + return 0, writeErr } + return ms.runFallbackUpdateLoop(ctx, "MetadataStore.WriteUpdateWithXattrs", k, primaryDelegate, step) } func (ms *MetadataStore) UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xv map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) { From df9ab22dab01466cae9413180bcf9c8d578dabd8 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 8 May 2026 19:56:09 +0100 Subject: [PATCH 4/4] CBG-5291: Apply second-round review fixes and inline dual-store update loop Address Tor's PR review and tighten the wrapper: - Use errors.Is for ErrCasFailureShouldRetry (was ==). - Add fallback-tombstone resurrection coverage for Update and WriteUpdateWithXattrs, plus a concurrent-fallback-writer test to document the primary-shadows-fallback contract. - Inline runFallbackUpdateLoop into Update and WriteUpdateWithXattrs; drop the errRetryUpdateLoop sentinel. - Document the deliberate divergence from upstream Update's return-cas-on-error behaviour, and explain why no previousLoopCas defensive check is needed here. - Simplify godoc and rename closure-locals (primaryDelegate -> primaryUpdateFn, fallbackCas -> fallbackCasForDelete/Tombstone) for clarity. Co-Authored-By: Claude Opus 4.7 (1M context) --- base/bucket_gocb_test.go | 174 +++++++++++++++++++++++++++++++ base/dual_metadata_store.go | 203 +++++++++++++++++++----------------- 2 files changed, 279 insertions(+), 98 deletions(-) diff --git a/base/bucket_gocb_test.go b/base/bucket_gocb_test.go index 515622fa36..a113236b47 100644 --- a/base/bucket_gocb_test.go +++ b/base/bucket_gocb_test.go @@ -3356,6 +3356,180 @@ func TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback(t *testing.T) { assert.Equal(t, cas, primaryCas) } +// TestMetadataStoreUpdateFallbackTombstoneResurrects covers the lifecycle that follows +// TestMetadataStoreUpdateDeleteFallbackOnly: once Update has hard-deleted the fallback copy, +// a subsequent Update on the same key must succeed by inserting into primary. (A fallback +// tombstone is read as DocNotFound by GetRaw, so the wrapper hands the call off to +// primary.Update, whose own loop performs a cas=0 Insert — same end-state as upstream Update +// on a tombstone.) +func TestMetadataStoreUpdateFallbackTombstoneResurrects(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + ok, err := metaStore.Fallback().Add(docID, 0, []byte(`{"counter":1}`)) + require.NoError(t, err) + require.True(t, ok) + + // Step 1: delete the fallback-only doc through the wrapper. + _, err = metaStore.Update(docID, 0, func(_ []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + return nil, nil, true, nil + }) + require.NoError(t, err) + + // Step 2: resurrect via Update. Callback must observe nil (primary empty, fallback gone). + resurrectBody := []byte(`{"counter":2}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return resurrectBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas, "resurrection must land in primary with a real CAS") + assert.Nil(t, seenByCallback, "callback should observe nil — neither store holds the doc") + + // Resurrection lives in primary; fallback stays absent. + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, resurrectBody, primaryRaw) + assert.Equal(t, cas, primaryCas) + + _, _, err = metaStore.Fallback().GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "fallback must remain empty after resurrection, got %v", err) +} + +// TestMetadataStoreUpdateConcurrentFallbackWriterShadowed documents what happens when a +// concurrent writer mutates the fallback store *while* the wrapper's Update is mid-flight +// against a fallback-only doc. Writing to fallback is a misuse of the wrapper (writes are +// supposed to land in primary), but the wrapper still has to behave sanely: +// - the callback sees the snapshot the wrapper read at the start of the iteration, +// - the wrapper inserts that snapshot's update into primary (cas=0 Insert), and +// - subsequent reads through the wrapper return the primary value, shadowing the +// concurrent fallback write. +// +// Net effect: primary becomes authoritative; the concurrent fallback mutation is "lost" +// from the caller's perspective, which is the correct outcome under this PR's design. +func TestMetadataStoreUpdateConcurrentFallbackWriterShadowed(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"src":"fallback-original"}`) + concurrentBody := []byte(`{"src":"fallback-concurrent"}`) + wrapperBody := []byte(`{"src":"wrapper-update"}`) + + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) + require.NoError(t, err) + require.True(t, ok) + + var calls int + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + calls++ + seenByCallback = append([]byte(nil), current...) + // Concurrent writer races into fallback after we've already snapshotted it. + setErr := metaStore.Fallback().Set(docID, 0, nil, concurrentBody) + require.NoError(t, setErr) + return wrapperBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, 1, calls, "primary insert should succeed first try; no retry expected when only fallback raced") + assert.Equal(t, originalBody, seenByCallback, "callback must see the snapshot, not the concurrent write") + + // Primary holds the wrapper's update. + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, wrapperBody, primaryRaw) + assert.Equal(t, cas, primaryCas) + + // Fallback holds the racer's value — we never wrote it. + fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, concurrentBody, fallbackRaw) + + // Wrapper-level read returns primary's value; the racer's fallback write is shadowed. + wrapperRaw, _, err := metaStore.GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, wrapperBody, wrapperRaw, "wrapper read must surface primary, shadowing concurrent fallback writer") +} + +// TestMetadataStoreWriteUpdateWithXattrsResurrectsAfterFallbackTombstone covers the +// dbconfig-style lifecycle Tor flagged in review: a doc with xattrs gets tombstoned (in this +// case the tombstone lives in fallback because the doc never made it to primary), and a +// subsequent WriteUpdateWithXattrs must resurrect it — landing the live doc in primary. +func TestMetadataStoreWriteUpdateWithXattrsResurrectsAfterFallbackTombstone(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + xattrKeys := []string{xattrKey} + originalBody := []byte(`{"v":1}`) + originalXattr := []byte(`{"seq":1}`) + + // Seed fallback only. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: originalBody, + Xattrs: map[string][]byte{xattrKey: originalXattr}, + }, nil + }) + require.NoError(t, err) + + // Step 1: tombstone via the wrapper. Tombstone lands on fallback (only store that + // ever held the doc). + tombstoneXattr := []byte(`{"seq":2,"deleted":true}`) + _, err = metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(_ []byte, _ map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Xattrs: map[string][]byte{xattrKey: tombstoneXattr}, + IsTombstone: true, + }, nil + }) + require.NoError(t, err) + + // Step 2: resurrect via the wrapper. Callback observes the tombstone (body=nil, + // xattr retained); resurrection writes to primary as a fresh insert (cas=0). + resurrectBody := []byte(`{"v":3}`) + resurrectXattr := []byte(`{"seq":3}`) + var seenBody []byte + var seenXattrs map[string][]byte + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) { + seenBody = append([]byte(nil), doc...) + seenXattrs = xattrs + return sgbucket.UpdatedDoc{ + Doc: resurrectBody, + Xattrs: map[string][]byte{xattrKey: resurrectXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, cas, "resurrection must land in primary with a real CAS") + assert.Nil(t, seenBody, "callback observes nil body on a fallback tombstone") + require.Contains(t, seenXattrs, xattrKey, "callback observes the retained tombstone xattr") + assert.JSONEq(t, string(tombstoneXattr), string(seenXattrs[xattrKey])) + + // Resurrected doc lives in primary. + primaryBody, primaryXattrs, primaryCas, err := metaStore.Primary().GetWithXattrs(ctx, docID, xattrKeys) + require.NoError(t, err) + assert.Equal(t, resurrectBody, primaryBody) + require.Contains(t, primaryXattrs, xattrKey) + assert.Equal(t, resurrectXattr, primaryXattrs[xattrKey]) + assert.Equal(t, cas, primaryCas) +} + func TestReadDoesNotGoToFallbackWhenMigrationComplete(t *testing.T) { ctx := TestCtx(t) bucket := GetTestBucket(t) diff --git a/base/dual_metadata_store.go b/base/dual_metadata_store.go index b125e561ed..e503936c5b 100644 --- a/base/dual_metadata_store.go +++ b/base/dual_metadata_store.go @@ -169,66 +169,42 @@ func (ms *MetadataStore) Remove(k string, cas uint64) (casOut uint64, err error) return ms.primary.Remove(k, cas) } -// errRetryUpdateLoop is a sentinel returned by step functions inside runFallbackUpdateLoop to -// drive another loop iteration (CAS race, callback-requested retry). Never escapes this file. -var errRetryUpdateLoop = errors.New("metadata store: retry dual-store update loop") - -// runFallbackUpdateLoop is the shared scaffolding for Update and WriteUpdateWithXattrs. Each -// iteration probes primary and either delegates to primaryDelegate (primary holds the doc, or -// migration is sealed) or hands off to step. step owns the per-method specifics — reading -// fallback, invoking the caller's callback, and applying the chosen write/delete. step -// returns errRetryUpdateLoop to drive a new iteration; any other non-nil error is terminal. +// Update implements a CAS-safe read-modify-write that always lands in primary, even when the document only exists in the fallback DataStore. // -// Retries are bounded by DefaultRetrySleeper (matching the established CAS-retry convention -// for bucket ops elsewhere in this package); the helper returns NewRetryTimeoutError if the -// loop exceeds that bound. -func (ms *MetadataStore) runFallbackUpdateLoop(ctx context.Context, description, k string, primaryDelegate func() (uint64, error), step func() (uint64, error)) (uint64, error) { +// Covers two cases: +// 1. When the doc is already in primary the call simply delegates to the primary datastore's own Update. Happy-path. +// 2. When the doc is only in fallback, the wrapper feeds the fallback value to the callback and performs a CAS-safe insert of the result into primary. +// If a concurrent writer wins the race inserting into Primary, the loop retries the update operation against primary version of the document. +// +// When the callback requests a delete on a fallback-only doc, the wrapper applies the delete +// directly to the fallback store and ignores the primary (since the only DataStore has the doc to delete was the fallback). +func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (uint64, error) { worker := func() (shouldRetry bool, err error, casOut uint64) { primaryExists, existsErr := ms.primary.Exists(k) if existsErr != nil { return false, existsErr, 0 } + + // If we know something exists in Primary, do a direct Update there if primaryExists || ms.migrationComplete.Load() { - casOut, err := primaryDelegate() - return false, err, casOut - } - casOut, stepErr := step() - if errors.Is(stepErr, errRetryUpdateLoop) { - return true, nil, 0 + casOut, updateErr := ms.primary.Update(k, exp, callback) + return false, updateErr, casOut } - return false, stepErr, casOut - } - err, casOut := RetryLoopCas(ctx, description, worker, DefaultRetrySleeper()) - return casOut, err -} -// Update implements a CAS-safe read-modify-write that always lands in primary, even when the -// document only exists in the fallback store. When the doc is in primary (or fallback is no -// longer consulted) the call delegates to the primary's own Update — its CAS retry loop is -// already correct. When the doc is only in fallback, the wrapper feeds the fallback value to -// the caller's callback and inserts the result into primary; if a concurrent writer wins the -// race, the loop retries against primary. -// -// When the callback requests a delete on a fallback-only doc, the wrapper applies the delete -// directly to the fallback store (the only store that ever held the doc). Without this, -// subsequent reads would still surface the doc via the read-fallback path until the migration -// sweep ran. -func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (uint64, error) { - primaryUpdateFn := func() (uint64, error) { - return ms.primary.Update(k, exp, callback) - } - step := func() (uint64, error) { - fallbackValue, fallbackCas, err := ms.fallback.GetRaw(k) + // Begin fallback DataStore Get to perform Update + fallbackValue, fallbackCasForDelete, err := ms.fallback.GetRaw(k) if IsDocNotFoundError(err) { - return primaryUpdateFn() + // Neither store has the doc — let primary's own Update insert it. + casOut, updateErr := ms.primary.Update(k, exp, callback) + return false, updateErr, casOut } if err != nil { - return 0, err + return false, err, 0 } newValue, cbExpiry, isDelete, cbErr := callback(fallbackValue) if cbErr != nil { - return 0, cbErr + return false, cbErr, 0 } writeExp := exp if cbExpiry != nil { @@ -237,27 +213,38 @@ func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFu if isDelete { // Delete the fallback copy — primary never held it, so a primary tombstone is - // pointless and would shadow the fallback for nothing. Remove with the CAS we - // just observed; on a concurrent fallback mutation we retry the loop. - _, removeErr := ms.fallback.Remove(k, fallbackCas) + // pointless and would shadow the fallback for nothing. + // Remove with the CAS we just observed; on a concurrent fallback mutation we retry the loop. + _, removeErr := ms.fallback.Remove(k, fallbackCasForDelete) switch { case removeErr == nil, IsDocNotFoundError(removeErr): - return 0, nil + return false, nil, 0 case IsCasMismatch(removeErr): - return 0, errRetryUpdateLoop + // retry: concurrent fallback mutation underneath this Delete + // This shouldn't happen with the MetadataStore wrapper only routing writes into Primary, + // but it could be two concurrent Deletes that we should at least have one more attempt for + return true, nil, 0 } - return 0, removeErr + return false, removeErr, 0 } + + // Write to primary with CAS=0 to force safe insertion casOut, writeErr := ms.primary.WriteCas(k, writeExp, 0, newValue, 0) - if writeErr == nil { - return casOut, nil - } if IsCasMismatch(writeErr) { - return 0, errRetryUpdateLoop + // retry: concurrent writer beat us to primary + // retry loop will run again and be routed directly to the primary DataStore Update + return true, nil, 0 + } + if writeErr != nil { + return false, writeErr, 0 } - return 0, writeErr + + // success: Only case where we can actually return a non-zero CAS - successful Primary DataStore Insert. + return false, nil, casOut } - return ms.runFallbackUpdateLoop(context.TODO(), "MetadataStore.Update", k, primaryUpdateFn, step) + + err, casOut := RetryLoopCas(context.TODO(), "MetadataStore.Update", worker, DefaultRetrySleeper()) + return casOut, err } func (ms *MetadataStore) Incr(k string, amt, def uint64, exp uint32) (casOut uint64, err error) { @@ -312,48 +299,57 @@ func (ms *MetadataStore) DeleteWithXattrs(ctx context.Context, k string, xattrKe return ms.primary.DeleteWithXattrs(ctx, k, xattrKeys) } -// WriteUpdateWithXattrs is the xattr-aware analogue of Update and follows the same -// "read-from-both, write-to-primary" pattern. If the caller has supplied a previous CAS from -// the primary (or migration is complete), the call delegates straight to the primary's own -// retry loop. Otherwise the wrapper probes primary first; on a primary miss it reads -// body+xattrs from fallback, hands them to the callback with cas=0, and inserts the result -// into primary via WriteWithXattrs, retrying on CAS race. +// WriteUpdateWithXattrs is the xattr-aware analogue of Update — a CAS-safe read-modify-write that always lands in primary, even when the document only exists in the fallback DataStore. // -// previous.Cas != 0 short-circuits to the primary's own loop. The wrapper's read methods -// (GetWithXattrs, GetXattrs, Get, GetRaw) only ever return a non-zero CAS for a primary hit — -// fallback CAS is discarded on the read path — so any non-zero previous.Cas a caller obtained -// through this wrapper unambiguously identifies a primary revision. Callers that bypass the -// wrapper (e.g. by calling ms.Fallback() directly) and then feed a fallback CAS back through -// previous.Cas would be misusing the API; that contract is documented but not enforced. +// Covers two cases: +// 1. When the doc is already in primary (or the caller passes a non-zero previous.Cas, which by contract is a primary cas) the call simply delegates to the primary datastore's own WriteUpdateWithXattrs. Happy-path. +// 2. When the doc is only in fallback, the wrapper feeds the fallback body+xattrs to the callback with cas=0 and performs a CAS-safe insert of the result into primary via WriteWithXattrs. +// If a concurrent writer wins the race inserting into Primary, the loop retries the update operation against primary version of the document. // -// When the callback returns IsTombstone for a fallback-only doc, the wrapper writes the -// tombstone directly to fallback (the only store that ever held the doc); primary is left -// untouched. Without this, callers reading after the tombstone would still see the live doc -// via the read-fallback path until the migration sweep ran. +// When the callback returns IsTombstone for a fallback-only doc, the wrapper writes the tombstone +// directly to fallback and ignores the primary (since the only DataStore that has the doc to tombstone was the fallback). +// +// Callers must not feed a fallback CAS back through previous.Cas — the wrapper's read methods only surface primary CAS, so any non-zero previous.Cas is treated as primary; misuse is documented but not enforced. func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (uint64, error) { + // If we're updating with a previous.Cas - we'll be updating on top of a prior fetch from the primary datastore, not the fallback data store. + // Or if we've tagged MetadataStore with 'migrationComplete' - avoid the fallback effort... if (previous != nil && previous.Cas != 0) || ms.migrationComplete.Load() { return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) } - primaryDelegate := func() (uint64, error) { - return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) - } - step := func() (uint64, error) { - fallbackBody, fallbackXattrs, fallbackCas, err := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) + + // Retry loop for Primary/Fallback + worker := func() (shouldRetry bool, err error, casOut uint64) { + primaryExists, existsErr := ms.primary.Exists(k) + if existsErr != nil { + return false, existsErr, 0 + } + + // If we know something exists in Primary, do a direct WriteUpdateWithXattrs there + if primaryExists || ms.migrationComplete.Load() { + casOut, updateErr := ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + return false, updateErr, casOut + } + + // Begin fallback DataStore Get to perform WriteUpdateWithXattrs + fallbackBody, fallbackXattrs, fallbackCasForTombstone, err := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) if IsDocNotFoundError(err) { - return primaryDelegate() + // Neither store has the doc — let primary's own WriteUpdateWithXattrs insert it. + casOut, updateErr := ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + return false, updateErr, casOut } // A doc with no/partial xattrs is still a fallback hit — surface what we have to the // callback rather than treat it as not-found. if err != nil && !IsXattrNotFoundError(err) && !errors.Is(err, ErrXattrPartialFound) { - return 0, err + return false, err, 0 } updatedDoc, cbErr := callback(fallbackBody, fallbackXattrs, 0) - if cbErr == ErrCasFailureShouldRetry { - return 0, errRetryUpdateLoop + if errors.Is(cbErr, ErrCasFailureShouldRetry) { + // retry: callback explicitly asked us to retry + return true, nil, 0 } if cbErr != nil { - return 0, cbErr + return false, cbErr, 0 } writeExp := exp if updatedDoc.Expiry != nil { @@ -361,31 +357,42 @@ func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xa } if updatedDoc.IsTombstone { - // Tombstone the fallback copy directly. deleteBody=true clears the body while - // preserving the requested xattrs (if any) on the fallback record. - _, tombstoneErr := ms.fallback.WriteTombstoneWithXattrs(ctx, k, writeExp, fallbackCas, updatedDoc.Xattrs, updatedDoc.XattrsToDelete, true, opts) + // Tombstone the fallback copy directly — primary never held it, so a primary tombstone is + // pointless and would shadow the fallback for nothing. + // deleteBody=true clears the body while preserving the requested xattrs (if any) on the fallback record. + _, tombstoneErr := ms.fallback.WriteTombstoneWithXattrs(ctx, k, writeExp, fallbackCasForTombstone, updatedDoc.Xattrs, updatedDoc.XattrsToDelete, true, opts) switch { case tombstoneErr == nil, IsDocNotFoundError(tombstoneErr): - return 0, nil + return false, nil, 0 case IsCasMismatch(tombstoneErr): - return 0, errRetryUpdateLoop + // retry: concurrent fallback mutation underneath this Tombstone + // This shouldn't happen with the MetadataStore wrapper only routing writes into Primary, + // but it could be two concurrent Tombstones that we should at least have one more attempt for + return true, nil, 0 } - return 0, tombstoneErr + return false, tombstoneErr, 0 } - // xattrsToDelete is intentionally nil on this branch: this is an insert (cas=0) and - // the underlying primary store rejects xattrsToDelete on insert - // (sgbucket.ErrDeleteXattrOnDocumentInsert). Anything the callback wanted dropped is - // already absent from updatedDoc.Xattrs, so it never lands in primary. + + // Write to primary with CAS=0 to force safe insertion. + // xattrsToDelete is intentionally nil here: cas=0 is an insert and the underlying primary + // store rejects xattrsToDelete on insert (sgbucket.ErrDeleteXattrOnDocumentInsert). + // Anything the callback wanted dropped is already absent from updatedDoc.Xattrs. casOut, writeErr := ms.primary.WriteWithXattrs(ctx, k, writeExp, 0, updatedDoc.Doc, updatedDoc.Xattrs, nil, opts) - if writeErr == nil { - return casOut, nil - } if IsCasMismatch(writeErr) || IsDocNotFoundError(writeErr) { - return 0, errRetryUpdateLoop + // retry: concurrent writer beat us to primary + // retry loop will run again and be routed directly to the primary DataStore WriteUpdateWithXattrs. + return true, nil, 0 + } + if writeErr != nil { + return false, writeErr, 0 } - return 0, writeErr + + // success: Only case where we can actually return a non-zero CAS - successful Primary DataStore Insert. + return false, nil, casOut } - return ms.runFallbackUpdateLoop(ctx, "MetadataStore.WriteUpdateWithXattrs", k, primaryDelegate, step) + + err, casOut := RetryLoopCas(ctx, "MetadataStore.WriteUpdateWithXattrs", worker, DefaultRetrySleeper()) + return casOut, err } func (ms *MetadataStore) UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xv map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {