diff --git a/base/bucket_gocb_test.go b/base/bucket_gocb_test.go index f355c9141a..a113236b47 100644 --- a/base/bucket_gocb_test.go +++ b/base/bucket_gocb_test.go @@ -2843,10 +2843,12 @@ func TestMetadataStoreKVStoreWriteOperations(t *testing.T) { }) require.NoError(t, err) require.NotZero(t, cas) - // CBG-5291: update currently writes to fallback - readRawBody, _, err = metaStore.Fallback().GetRaw(updateDocID) + // verify in primary, not in fallback + readRawBody, _, err = metaStore.Primary().GetRaw(updateDocID) require.NoError(t, err) assert.Equal(t, updateBody, readRawBody) + _, _, err = metaStore.Fallback().GetRaw(updateDocID) + require.True(t, IsDocNotFoundError(err)) // fallback expects error // Test Incr incrDocID := t.Name() + "_incr" @@ -2919,6 +2921,615 @@ func TestMetadataStoreKVStoreWriteOperations(t *testing.T) { assert.False(t, exists) } +// TestMetadataStoreUpdateMigratesFromFallback verifies that an Update against a key that lives only +// in the fallback store invokes the caller's callback with the fallback value, and writes the result +// to the primary store (never back to the fallback). This is the core CBG-5291 case. +func TestMetadataStoreUpdateMigratesFromFallback(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"counter":1}`) + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) + require.NoError(t, err) + require.True(t, ok) + + updatedBody := []byte(`{"counter":2}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return updatedBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, originalBody, seenByCallback, "callback must observe the fallback value, not nil") + + // New value lives in primary at the returned CAS. + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryRaw) + assert.Equal(t, cas, primaryCas) + + // Fallback copy is left untouched - the migration sweep handles cleanup. + fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, originalBody, fallbackRaw) +} + +// TestMetadataStoreUpdateExistingPrimary verifies that when the doc already exists in primary, +// Update goes through the standard CAS path against primary (callback sees primary value, write +// replaces in primary, fallback is ignored even if it has stale data). +func TestMetadataStoreUpdateExistingPrimary(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + primaryBody := []byte(`{"src":"primary"}`) + fallbackBody := []byte(`{"src":"fallback"}`) + ok, err := metaStore.Primary().Add(docID, 0, primaryBody) + require.NoError(t, err) + require.True(t, ok) + ok, err = metaStore.Fallback().Add(docID, 0, fallbackBody) + require.NoError(t, err) + require.True(t, ok) + + updatedBody := []byte(`{"src":"updated"}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return updatedBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, primaryBody, seenByCallback, "callback must observe primary value, not fallback") + + primaryRaw, _, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryRaw) + + fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, fallbackBody, fallbackRaw, "fallback is never written by Update") +} + +// TestMetadataStoreUpdateDeleteFallbackOnly verifies the CBG-5291 delete behaviour for a doc +// that exists only in fallback: the wrapper writes the delete to fallback (the only store +// that ever held the doc) and does not create a primary tombstone. Without this, callers +// reading after delete would still observe the doc via the fallback read path until the +// migration sweep ran. +func TestMetadataStoreUpdateDeleteFallbackOnly(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"counter":1}`) + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) + require.NoError(t, err) + require.True(t, ok) + + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + assert.Equal(t, originalBody, current, "callback must observe fallback value") + return nil, nil, true, nil + }) + require.NoError(t, err) + assert.Zero(t, cas) + + // Primary stays absent - we did not write a primary tombstone. + exists, err := metaStore.Primary().Exists(docID) + require.NoError(t, err) + assert.False(t, exists) + + // Fallback delete landed - subsequent reads must surface not-found. + _, _, err = metaStore.Fallback().GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "expected fallback delete, got err=%v", err) + + // Wrapper-level read also reports not-found (no read fallthrough to a stale doc). + _, _, err = metaStore.GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "wrapper read must report not-found after delete, got err=%v", err) +} + +// TestMetadataStoreUpdateRetriesOnCASMismatch exercises the wrapper's CAS retry: the doc starts +// life in fallback only; from inside the callback (between the wrapper's primary.Exists probe +// and its primary.WriteCas insert) a concurrent writer lands a doc in primary. The first +// WriteCas(cas=0) must hit a CAS mismatch and the loop must retry — on retry primary now holds +// the doc, so the wrapper delegates to primary.Update, the callback fires again against the +// primary value, and the second result is what finally lands in primary. +func TestMetadataStoreUpdateRetriesOnCASMismatch(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"src":"fallback"}`) + racerBody := []byte(`{"src":"racer"}`) + + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) + require.NoError(t, err) + require.True(t, ok) + + var calls int + var callbackInputs [][]byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + calls++ + callbackInputs = append(callbackInputs, append([]byte(nil), current...)) + if calls == 1 { + // Simulate a concurrent writer beating us to the punch on primary. + added, addErr := metaStore.Primary().Add(docID, 0, racerBody) + require.NoError(t, addErr) + require.True(t, added) + return []byte(`{"src":"call1"}`), nil, false, nil + } + return []byte(`{"src":"call2"}`), nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + require.Equal(t, 2, calls, "wrapper must retry through primary.Update after CAS race") + assert.Equal(t, originalBody, callbackInputs[0], "first callback observes the fallback value") + assert.Equal(t, racerBody, callbackInputs[1], "second callback observes the racer's value from primary") + + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, []byte(`{"src":"call2"}`), primaryRaw, "the second callback's result wins") + assert.Equal(t, cas, primaryCas) +} + +// TestMetadataStoreUpdateAfterMigrationComplete verifies that once SetMigrationComplete has been +// called, fallback contents are ignored entirely - even by Update - and the wrapper behaves as a +// thin pass-through to the primary's Update. +func TestMetadataStoreUpdateAfterMigrationComplete(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + metaStore.SetMigrationComplete() + + docID := t.Name() + ok, err := metaStore.Fallback().Add(docID, 0, []byte(`{"counter":99}`)) + require.NoError(t, err) + require.True(t, ok) + + updatedBody := []byte(`{"counter":1}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return updatedBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Nil(t, seenByCallback, "fallback must not be consulted after migration complete") + + primaryRaw, _, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryRaw) +} + +// TestMetadataStoreWriteUpdateWithXattrsCASShortcut verifies the previous.Cas != 0 early-out: +// when the caller hands in a non-zero CAS, the wrapper must delegate straight to +// primary.WriteUpdateWithXattrs with the caller's previous, and must NOT consult fallback. +// +// We distinguish the path by handing in a previous whose Body differs from what's actually in +// primary (CAS still matches). With the shortcut: callback observes the caller-supplied body. +// Without the shortcut: callback would observe the actual primary body (because the wrapper +// would pass nil for previous and primary.WriteUpdateWithXattrs would re-read). +func TestMetadataStoreWriteUpdateWithXattrsCASShortcut(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + primaryBody := []byte(`{"src":"primary"}`) + primaryXattr := []byte(`{"seq":1}`) + fallbackBody := []byte(`{"src":"fallback"}`) + + // Seed primary; capture its actual CAS. + primaryCas, err := metaStore.Primary().WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: primaryBody, + Xattrs: map[string][]byte{xattrKey: primaryXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, primaryCas) + + // Seed fallback with a DIFFERENT body — if the wrapper were to consult fallback in this + // path the callback would surface it. The shortcut must skip this read entirely. + ok, err := metaStore.Fallback().Add(docID, 0, fallbackBody) + require.NoError(t, err) + require.True(t, ok) + + // Hand in a previous whose Body and Xattrs disagree with primary, but whose CAS matches. + // If the shortcut runs, primary.WriteUpdateWithXattrs uses this previous directly and the + // callback sees `bogusBody`. + bogusBody := []byte(`{"src":"caller-supplied"}`) + bogusXattr := []byte(`{"seq":99}`) + previous := &sgbucket.BucketDocument{ + Cas: primaryCas, + Body: bogusBody, + Xattrs: map[string][]byte{xattrKey: bogusXattr}, + } + + var callbackObservations [][]byte + updatedBody := []byte(`{"src":"updated"}`) + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, previous, nil, + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { + callbackObservations = append(callbackObservations, append([]byte(nil), doc...)) + return sgbucket.UpdatedDoc{ + Doc: updatedBody, + Xattrs: map[string][]byte{xattrKey: primaryXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + require.NotEmpty(t, callbackObservations) + assert.Equal(t, bogusBody, callbackObservations[0], + "shortcut must pass caller's previous straight through to primary.WriteUpdateWithXattrs; callback should see caller-supplied body, not primary's actual body") + assert.NotEqual(t, fallbackBody, callbackObservations[0], "fallback must never be consulted on the shortcut path") +} + +// TestMetadataStoreWriteUpdateWithXattrsHonorsXattrsToDelete verifies the migration-insert path +// when the callback returns XattrsToDelete: the underlying primary insert (cas=0) cannot accept +// xattrsToDelete (rosmar/CBS both reject it as ErrDeleteXattrOnDocumentInsert), so the wrapper +// must drop them silently. The user-visible contract: only the xattrs the callback explicitly +// returned in UpdatedDoc.Xattrs land in primary; xattrs the callback marked for deletion are +// simply not migrated (which is equivalent to "deleted" since primary never held them). +func TestMetadataStoreWriteUpdateWithXattrsHonorsXattrsToDelete(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + keepXattr := SyncXattrName + dropXattr := "_drop" + xattrKeys := []string{keepXattr, dropXattr} + + // Seed fallback with body + two xattrs. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: []byte(`{"v":1}`), + Xattrs: map[string][]byte{ + keepXattr: []byte(`{"keep":1}`), + dropXattr: []byte(`{"drop":1}`), + }, + }, nil + }) + require.NoError(t, err) + + // Migrate via the wrapper. Callback returns updated body, retains keepXattr, requests + // dropXattr be dropped via XattrsToDelete. + updatedBody := []byte(`{"v":2}`) + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { + require.Contains(t, xattrs, keepXattr, "callback should observe both fallback xattrs") + require.Contains(t, xattrs, dropXattr) + return sgbucket.UpdatedDoc{ + Doc: updatedBody, + Xattrs: map[string][]byte{keepXattr: []byte(`{"keep":2}`)}, + XattrsToDelete: []string{dropXattr}, + }, nil + }) + require.NoError(t, err, "migration with XattrsToDelete must not surface ErrDeleteXattrOnDocumentInsert") + require.NotZero(t, cas) + + // Primary holds only the kept xattr; the dropped one was never migrated. + primaryBody, primaryXattrs, _, err := metaStore.Primary().GetWithXattrs(ctx, docID, xattrKeys) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryBody) + require.Contains(t, primaryXattrs, keepXattr) + assert.JSONEq(t, `{"keep":2}`, string(primaryXattrs[keepXattr])) + assert.NotContains(t, primaryXattrs, dropXattr, "dropped xattr must not be migrated to primary") +} + +// TestMetadataStoreWriteUpdateWithXattrsTombstoneFallbackOnly is the xattr analogue of +// TestMetadataStoreUpdateDeleteFallbackOnly: when the callback returns a tombstone for a doc +// that exists only in fallback, the wrapper writes the tombstone to fallback (the only store +// that ever held the doc). Subsequent reads through the wrapper must surface not-found. +func TestMetadataStoreWriteUpdateWithXattrsTombstoneFallbackOnly(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + originalBody := []byte(`{"counter":1}`) + originalXattr := []byte(`{"seq":1}`) + + // Seed fallback only. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: originalBody, + Xattrs: map[string][]byte{xattrKey: originalXattr}, + }, nil + }) + require.NoError(t, err) + + // Tombstone via the wrapper. + tombstoneXattr := []byte(`{"seq":2,"deleted":true}`) + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { + assert.Equal(t, originalBody, doc, "callback must observe fallback body") + return sgbucket.UpdatedDoc{ + Xattrs: map[string][]byte{xattrKey: tombstoneXattr}, + IsTombstone: true, + }, nil + }) + require.NoError(t, err) + assert.Zero(t, cas, "tombstone path returns 0 CAS — primary was never written") + + // Primary stays absent: we did not create a primary tombstone. + exists, err := metaStore.Primary().Exists(docID) + require.NoError(t, err) + assert.False(t, exists) + + // Fallback body is gone — GetRaw is the body-only read path. + _, _, err = metaStore.Fallback().GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "fallback body must be removed by tombstone, got %v", err) + + // Wrapper-level read also reports not-found (no read fallthrough to a stale doc body). + _, _, err = metaStore.GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "wrapper read must report not-found after tombstone, got %v", err) + + // Xattr is still readable on fallback (it's a tombstone with retained xattr). + fallbackXattrs, _, xerr := metaStore.Fallback().GetXattrs(ctx, docID, []string{xattrKey}) + require.NoError(t, xerr) + require.Contains(t, fallbackXattrs, xattrKey) + assert.JSONEq(t, string(tombstoneXattr), string(fallbackXattrs[xattrKey]), "fallback xattr must reflect the tombstone xattr") +} + +// TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback is the WriteUpdateWithXattrs analogue +// of TestMetadataStoreUpdateMigratesFromFallback: the wrapper must surface the fallback body and +// xattrs to the callback and write the result to primary. +func TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + originalBody := []byte(`{"counter":1}`) + originalXattr := []byte(`{"seq":1}`) + + // Seed the fallback store using its own WriteUpdateWithXattrs as an insert. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: originalBody, + Xattrs: map[string][]byte{xattrKey: originalXattr}, + }, nil + }) + require.NoError(t, err) + + updatedBody := []byte(`{"counter":2}`) + updatedXattr := []byte(`{"seq":2}`) + var seenBody []byte + var seenXattrs map[string][]byte + var seenCas uint64 + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, []string{xattrKey}, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cbCas uint64) (sgbucket.UpdatedDoc, error) { + seenBody = append([]byte(nil), doc...) + seenXattrs = xattrs + seenCas = cbCas + return sgbucket.UpdatedDoc{ + Doc: updatedBody, + Xattrs: map[string][]byte{xattrKey: updatedXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, originalBody, seenBody, "callback must observe fallback body") + require.Contains(t, seenXattrs, xattrKey) + assert.Equal(t, originalXattr, seenXattrs[xattrKey], "callback must observe fallback xattr") + assert.Zero(t, seenCas, "callback must see cas=0 (fallback CAS is never propagated)") + + // New value lives in primary with the expected body and xattr. + primaryBody, primaryXattrs, primaryCas, err := metaStore.Primary().GetWithXattrs(ctx, docID, []string{xattrKey}) + require.NoError(t, err) + assert.Equal(t, updatedBody, primaryBody) + assert.Equal(t, updatedXattr, primaryXattrs[xattrKey]) + assert.Equal(t, cas, primaryCas) +} + +// TestMetadataStoreUpdateFallbackTombstoneResurrects covers the lifecycle that follows +// TestMetadataStoreUpdateDeleteFallbackOnly: once Update has hard-deleted the fallback copy, +// a subsequent Update on the same key must succeed by inserting into primary. (A fallback +// tombstone is read as DocNotFound by GetRaw, so the wrapper hands the call off to +// primary.Update, whose own loop performs a cas=0 Insert — same end-state as upstream Update +// on a tombstone.) +func TestMetadataStoreUpdateFallbackTombstoneResurrects(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + ok, err := metaStore.Fallback().Add(docID, 0, []byte(`{"counter":1}`)) + require.NoError(t, err) + require.True(t, ok) + + // Step 1: delete the fallback-only doc through the wrapper. + _, err = metaStore.Update(docID, 0, func(_ []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + return nil, nil, true, nil + }) + require.NoError(t, err) + + // Step 2: resurrect via Update. Callback must observe nil (primary empty, fallback gone). + resurrectBody := []byte(`{"counter":2}`) + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + seenByCallback = append([]byte(nil), current...) + return resurrectBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas, "resurrection must land in primary with a real CAS") + assert.Nil(t, seenByCallback, "callback should observe nil — neither store holds the doc") + + // Resurrection lives in primary; fallback stays absent. + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, resurrectBody, primaryRaw) + assert.Equal(t, cas, primaryCas) + + _, _, err = metaStore.Fallback().GetRaw(docID) + require.True(t, IsDocNotFoundError(err), "fallback must remain empty after resurrection, got %v", err) +} + +// TestMetadataStoreUpdateConcurrentFallbackWriterShadowed documents what happens when a +// concurrent writer mutates the fallback store *while* the wrapper's Update is mid-flight +// against a fallback-only doc. Writing to fallback is a misuse of the wrapper (writes are +// supposed to land in primary), but the wrapper still has to behave sanely: +// - the callback sees the snapshot the wrapper read at the start of the iteration, +// - the wrapper inserts that snapshot's update into primary (cas=0 Insert), and +// - subsequent reads through the wrapper return the primary value, shadowing the +// concurrent fallback write. +// +// Net effect: primary becomes authoritative; the concurrent fallback mutation is "lost" +// from the caller's perspective, which is the correct outcome under this PR's design. +func TestMetadataStoreUpdateConcurrentFallbackWriterShadowed(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + originalBody := []byte(`{"src":"fallback-original"}`) + concurrentBody := []byte(`{"src":"fallback-concurrent"}`) + wrapperBody := []byte(`{"src":"wrapper-update"}`) + + ok, err := metaStore.Fallback().Add(docID, 0, originalBody) + require.NoError(t, err) + require.True(t, ok) + + var calls int + var seenByCallback []byte + cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) { + calls++ + seenByCallback = append([]byte(nil), current...) + // Concurrent writer races into fallback after we've already snapshotted it. + setErr := metaStore.Fallback().Set(docID, 0, nil, concurrentBody) + require.NoError(t, setErr) + return wrapperBody, nil, false, nil + }) + require.NoError(t, err) + require.NotZero(t, cas) + assert.Equal(t, 1, calls, "primary insert should succeed first try; no retry expected when only fallback raced") + assert.Equal(t, originalBody, seenByCallback, "callback must see the snapshot, not the concurrent write") + + // Primary holds the wrapper's update. + primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, wrapperBody, primaryRaw) + assert.Equal(t, cas, primaryCas) + + // Fallback holds the racer's value — we never wrote it. + fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, concurrentBody, fallbackRaw) + + // Wrapper-level read returns primary's value; the racer's fallback write is shadowed. + wrapperRaw, _, err := metaStore.GetRaw(docID) + require.NoError(t, err) + assert.Equal(t, wrapperBody, wrapperRaw, "wrapper read must surface primary, shadowing concurrent fallback writer") +} + +// TestMetadataStoreWriteUpdateWithXattrsResurrectsAfterFallbackTombstone covers the +// dbconfig-style lifecycle Tor flagged in review: a doc with xattrs gets tombstoned (in this +// case the tombstone lives in fallback because the doc never made it to primary), and a +// subsequent WriteUpdateWithXattrs must resurrect it — landing the live doc in primary. +func TestMetadataStoreWriteUpdateWithXattrsResurrectsAfterFallbackTombstone(t *testing.T) { + SkipXattrTestsIfNotEnabled(t) + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore()) + + docID := t.Name() + xattrKey := SyncXattrName + xattrKeys := []string{xattrKey} + originalBody := []byte(`{"v":1}`) + originalXattr := []byte(`{"seq":1}`) + + // Seed fallback only. + _, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Doc: originalBody, + Xattrs: map[string][]byte{xattrKey: originalXattr}, + }, nil + }) + require.NoError(t, err) + + // Step 1: tombstone via the wrapper. Tombstone lands on fallback (only store that + // ever held the doc). + tombstoneXattr := []byte(`{"seq":2,"deleted":true}`) + _, err = metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(_ []byte, _ map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) { + return sgbucket.UpdatedDoc{ + Xattrs: map[string][]byte{xattrKey: tombstoneXattr}, + IsTombstone: true, + }, nil + }) + require.NoError(t, err) + + // Step 2: resurrect via the wrapper. Callback observes the tombstone (body=nil, + // xattr retained); resurrection writes to primary as a fresh insert (cas=0). + resurrectBody := []byte(`{"v":3}`) + resurrectXattr := []byte(`{"seq":3}`) + var seenBody []byte + var seenXattrs map[string][]byte + cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil, + func(doc []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) { + seenBody = append([]byte(nil), doc...) + seenXattrs = xattrs + return sgbucket.UpdatedDoc{ + Doc: resurrectBody, + Xattrs: map[string][]byte{xattrKey: resurrectXattr}, + }, nil + }) + require.NoError(t, err) + require.NotZero(t, cas, "resurrection must land in primary with a real CAS") + assert.Nil(t, seenBody, "callback observes nil body on a fallback tombstone") + require.Contains(t, seenXattrs, xattrKey, "callback observes the retained tombstone xattr") + assert.JSONEq(t, string(tombstoneXattr), string(seenXattrs[xattrKey])) + + // Resurrected doc lives in primary. + primaryBody, primaryXattrs, primaryCas, err := metaStore.Primary().GetWithXattrs(ctx, docID, xattrKeys) + require.NoError(t, err) + assert.Equal(t, resurrectBody, primaryBody) + require.Contains(t, primaryXattrs, xattrKey) + assert.Equal(t, resurrectXattr, primaryXattrs[xattrKey]) + assert.Equal(t, cas, primaryCas) +} + func TestReadDoesNotGoToFallbackWhenMigrationComplete(t *testing.T) { ctx := TestCtx(t) bucket := GetTestBucket(t) diff --git a/base/dual_metadata_store.go b/base/dual_metadata_store.go index 4e5037b5b0..e503936c5b 100644 --- a/base/dual_metadata_store.go +++ b/base/dual_metadata_store.go @@ -10,6 +10,7 @@ package base import ( "context" + "errors" "sync/atomic" sgbucket "github.com/couchbase/sg-bucket" @@ -168,9 +169,82 @@ func (ms *MetadataStore) Remove(k string, cas uint64) (casOut uint64, err error) return ms.primary.Remove(k, cas) } -func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (casOut uint64, err error) { - // CBG-5291: turn into insert operation for primary datastore. - return ms.fallback.Update(k, exp, callback) +// Update implements a CAS-safe read-modify-write that always lands in primary, even when the document only exists in the fallback DataStore. +// +// Covers two cases: +// 1. When the doc is already in primary the call simply delegates to the primary datastore's own Update. Happy-path. +// 2. When the doc is only in fallback, the wrapper feeds the fallback value to the callback and performs a CAS-safe insert of the result into primary. +// If a concurrent writer wins the race inserting into Primary, the loop retries the update operation against primary version of the document. +// +// When the callback requests a delete on a fallback-only doc, the wrapper applies the delete +// directly to the fallback store and ignores the primary (since the only DataStore has the doc to delete was the fallback). +func (ms *MetadataStore) Update(k string, exp uint32, callback sgbucket.UpdateFunc) (uint64, error) { + worker := func() (shouldRetry bool, err error, casOut uint64) { + primaryExists, existsErr := ms.primary.Exists(k) + if existsErr != nil { + return false, existsErr, 0 + } + + // If we know something exists in Primary, do a direct Update there + if primaryExists || ms.migrationComplete.Load() { + casOut, updateErr := ms.primary.Update(k, exp, callback) + return false, updateErr, casOut + } + + // Begin fallback DataStore Get to perform Update + fallbackValue, fallbackCasForDelete, err := ms.fallback.GetRaw(k) + if IsDocNotFoundError(err) { + // Neither store has the doc — let primary's own Update insert it. + casOut, updateErr := ms.primary.Update(k, exp, callback) + return false, updateErr, casOut + } + if err != nil { + return false, err, 0 + } + + newValue, cbExpiry, isDelete, cbErr := callback(fallbackValue) + if cbErr != nil { + return false, cbErr, 0 + } + writeExp := exp + if cbExpiry != nil { + writeExp = *cbExpiry + } + + if isDelete { + // Delete the fallback copy — primary never held it, so a primary tombstone is + // pointless and would shadow the fallback for nothing. + // Remove with the CAS we just observed; on a concurrent fallback mutation we retry the loop. + _, removeErr := ms.fallback.Remove(k, fallbackCasForDelete) + switch { + case removeErr == nil, IsDocNotFoundError(removeErr): + return false, nil, 0 + case IsCasMismatch(removeErr): + // retry: concurrent fallback mutation underneath this Delete + // This shouldn't happen with the MetadataStore wrapper only routing writes into Primary, + // but it could be two concurrent Deletes that we should at least have one more attempt for + return true, nil, 0 + } + return false, removeErr, 0 + } + + // Write to primary with CAS=0 to force safe insertion + casOut, writeErr := ms.primary.WriteCas(k, writeExp, 0, newValue, 0) + if IsCasMismatch(writeErr) { + // retry: concurrent writer beat us to primary + // retry loop will run again and be routed directly to the primary DataStore Update + return true, nil, 0 + } + if writeErr != nil { + return false, writeErr, 0 + } + + // success: Only case where we can actually return a non-zero CAS - successful Primary DataStore Insert. + return false, nil, casOut + } + + err, casOut := RetryLoopCas(context.TODO(), "MetadataStore.Update", worker, DefaultRetrySleeper()) + return casOut, err } func (ms *MetadataStore) Incr(k string, amt, def uint64, exp uint32) (casOut uint64, err error) { @@ -225,8 +299,100 @@ func (ms *MetadataStore) DeleteWithXattrs(ctx context.Context, k string, xattrKe return ms.primary.DeleteWithXattrs(ctx, k, xattrKeys) } -func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (casOut uint64, err error) { - return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) +// WriteUpdateWithXattrs is the xattr-aware analogue of Update — a CAS-safe read-modify-write that always lands in primary, even when the document only exists in the fallback DataStore. +// +// Covers two cases: +// 1. When the doc is already in primary (or the caller passes a non-zero previous.Cas, which by contract is a primary cas) the call simply delegates to the primary datastore's own WriteUpdateWithXattrs. Happy-path. +// 2. When the doc is only in fallback, the wrapper feeds the fallback body+xattrs to the callback with cas=0 and performs a CAS-safe insert of the result into primary via WriteWithXattrs. +// If a concurrent writer wins the race inserting into Primary, the loop retries the update operation against primary version of the document. +// +// When the callback returns IsTombstone for a fallback-only doc, the wrapper writes the tombstone +// directly to fallback and ignores the primary (since the only DataStore that has the doc to tombstone was the fallback). +// +// Callers must not feed a fallback CAS back through previous.Cas — the wrapper's read methods only surface primary CAS, so any non-zero previous.Cas is treated as primary; misuse is documented but not enforced. +func (ms *MetadataStore) WriteUpdateWithXattrs(ctx context.Context, k string, xattrKeys []string, exp uint32, previous *sgbucket.BucketDocument, opts *sgbucket.MutateInOptions, callback sgbucket.WriteUpdateWithXattrsFunc) (uint64, error) { + // If we're updating with a previous.Cas - we'll be updating on top of a prior fetch from the primary datastore, not the fallback data store. + // Or if we've tagged MetadataStore with 'migrationComplete' - avoid the fallback effort... + if (previous != nil && previous.Cas != 0) || ms.migrationComplete.Load() { + return ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, previous, opts, callback) + } + + // Retry loop for Primary/Fallback + worker := func() (shouldRetry bool, err error, casOut uint64) { + primaryExists, existsErr := ms.primary.Exists(k) + if existsErr != nil { + return false, existsErr, 0 + } + + // If we know something exists in Primary, do a direct WriteUpdateWithXattrs there + if primaryExists || ms.migrationComplete.Load() { + casOut, updateErr := ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + return false, updateErr, casOut + } + + // Begin fallback DataStore Get to perform WriteUpdateWithXattrs + fallbackBody, fallbackXattrs, fallbackCasForTombstone, err := ms.fallback.GetWithXattrs(ctx, k, xattrKeys) + if IsDocNotFoundError(err) { + // Neither store has the doc — let primary's own WriteUpdateWithXattrs insert it. + casOut, updateErr := ms.primary.WriteUpdateWithXattrs(ctx, k, xattrKeys, exp, nil, opts, callback) + return false, updateErr, casOut + } + // A doc with no/partial xattrs is still a fallback hit — surface what we have to the + // callback rather than treat it as not-found. + if err != nil && !IsXattrNotFoundError(err) && !errors.Is(err, ErrXattrPartialFound) { + return false, err, 0 + } + + updatedDoc, cbErr := callback(fallbackBody, fallbackXattrs, 0) + if errors.Is(cbErr, ErrCasFailureShouldRetry) { + // retry: callback explicitly asked us to retry + return true, nil, 0 + } + if cbErr != nil { + return false, cbErr, 0 + } + writeExp := exp + if updatedDoc.Expiry != nil { + writeExp = *updatedDoc.Expiry + } + + if updatedDoc.IsTombstone { + // Tombstone the fallback copy directly — primary never held it, so a primary tombstone is + // pointless and would shadow the fallback for nothing. + // deleteBody=true clears the body while preserving the requested xattrs (if any) on the fallback record. + _, tombstoneErr := ms.fallback.WriteTombstoneWithXattrs(ctx, k, writeExp, fallbackCasForTombstone, updatedDoc.Xattrs, updatedDoc.XattrsToDelete, true, opts) + switch { + case tombstoneErr == nil, IsDocNotFoundError(tombstoneErr): + return false, nil, 0 + case IsCasMismatch(tombstoneErr): + // retry: concurrent fallback mutation underneath this Tombstone + // This shouldn't happen with the MetadataStore wrapper only routing writes into Primary, + // but it could be two concurrent Tombstones that we should at least have one more attempt for + return true, nil, 0 + } + return false, tombstoneErr, 0 + } + + // Write to primary with CAS=0 to force safe insertion. + // xattrsToDelete is intentionally nil here: cas=0 is an insert and the underlying primary + // store rejects xattrsToDelete on insert (sgbucket.ErrDeleteXattrOnDocumentInsert). + // Anything the callback wanted dropped is already absent from updatedDoc.Xattrs. + casOut, writeErr := ms.primary.WriteWithXattrs(ctx, k, writeExp, 0, updatedDoc.Doc, updatedDoc.Xattrs, nil, opts) + if IsCasMismatch(writeErr) || IsDocNotFoundError(writeErr) { + // retry: concurrent writer beat us to primary + // retry loop will run again and be routed directly to the primary DataStore WriteUpdateWithXattrs. + return true, nil, 0 + } + if writeErr != nil { + return false, writeErr, 0 + } + + // success: Only case where we can actually return a non-zero CAS - successful Primary DataStore Insert. + return false, nil, casOut + } + + err, casOut := RetryLoopCas(ctx, "MetadataStore.WriteUpdateWithXattrs", worker, DefaultRetrySleeper()) + return casOut, err } func (ms *MetadataStore) UpdateXattrs(ctx context.Context, k string, exp uint32, cas uint64, xv map[string][]byte, opts *sgbucket.MutateInOptions) (casOut uint64, err error) {