Skip to content

Commit df9ab22

Browse files
bbrksclaude
andcommitted
CBG-5291: Apply second-round review fixes and inline dual-store update loop
Address Tor's PR review and tighten the wrapper: - Use errors.Is for ErrCasFailureShouldRetry (was ==). - Add fallback-tombstone resurrection coverage for Update and WriteUpdateWithXattrs, plus a concurrent-fallback-writer test to document the primary-shadows-fallback contract. - Inline runFallbackUpdateLoop into Update and WriteUpdateWithXattrs; drop the errRetryUpdateLoop sentinel. - Document the deliberate divergence from upstream Update's return-cas-on-error behaviour, and explain why no previousLoopCas defensive check is needed here. - Simplify godoc and rename closure-locals (primaryDelegate -> primaryUpdateFn, fallbackCas -> fallbackCasForDelete/Tombstone) for clarity. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0b4b59f commit df9ab22

2 files changed

Lines changed: 279 additions & 98 deletions

File tree

base/bucket_gocb_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3356,6 +3356,180 @@ func TestMetadataStoreWriteUpdateWithXattrsMigratesFromFallback(t *testing.T) {
33563356
assert.Equal(t, cas, primaryCas)
33573357
}
33583358

3359+
// TestMetadataStoreUpdateFallbackTombstoneResurrects covers the lifecycle that follows
3360+
// TestMetadataStoreUpdateDeleteFallbackOnly: once Update has hard-deleted the fallback copy,
3361+
// a subsequent Update on the same key must succeed by inserting into primary. (A fallback
3362+
// tombstone is read as DocNotFound by GetRaw, so the wrapper hands the call off to
3363+
// primary.Update, whose own loop performs a cas=0 Insert — same end-state as upstream Update
3364+
// on a tombstone.)
3365+
func TestMetadataStoreUpdateFallbackTombstoneResurrects(t *testing.T) {
3366+
ctx := TestCtx(t)
3367+
bucket := GetTestBucket(t)
3368+
defer bucket.Close(ctx)
3369+
3370+
metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore())
3371+
3372+
docID := t.Name()
3373+
ok, err := metaStore.Fallback().Add(docID, 0, []byte(`{"counter":1}`))
3374+
require.NoError(t, err)
3375+
require.True(t, ok)
3376+
3377+
// Step 1: delete the fallback-only doc through the wrapper.
3378+
_, err = metaStore.Update(docID, 0, func(_ []byte) (updated []byte, expiry *uint32, isDelete bool, err error) {
3379+
return nil, nil, true, nil
3380+
})
3381+
require.NoError(t, err)
3382+
3383+
// Step 2: resurrect via Update. Callback must observe nil (primary empty, fallback gone).
3384+
resurrectBody := []byte(`{"counter":2}`)
3385+
var seenByCallback []byte
3386+
cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) {
3387+
seenByCallback = append([]byte(nil), current...)
3388+
return resurrectBody, nil, false, nil
3389+
})
3390+
require.NoError(t, err)
3391+
require.NotZero(t, cas, "resurrection must land in primary with a real CAS")
3392+
assert.Nil(t, seenByCallback, "callback should observe nil — neither store holds the doc")
3393+
3394+
// Resurrection lives in primary; fallback stays absent.
3395+
primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID)
3396+
require.NoError(t, err)
3397+
assert.Equal(t, resurrectBody, primaryRaw)
3398+
assert.Equal(t, cas, primaryCas)
3399+
3400+
_, _, err = metaStore.Fallback().GetRaw(docID)
3401+
require.True(t, IsDocNotFoundError(err), "fallback must remain empty after resurrection, got %v", err)
3402+
}
3403+
3404+
// TestMetadataStoreUpdateConcurrentFallbackWriterShadowed documents what happens when a
3405+
// concurrent writer mutates the fallback store *while* the wrapper's Update is mid-flight
3406+
// against a fallback-only doc. Writing to fallback is a misuse of the wrapper (writes are
3407+
// supposed to land in primary), but the wrapper still has to behave sanely:
3408+
// - the callback sees the snapshot the wrapper read at the start of the iteration,
3409+
// - the wrapper inserts that snapshot's update into primary (cas=0 Insert), and
3410+
// - subsequent reads through the wrapper return the primary value, shadowing the
3411+
// concurrent fallback write.
3412+
//
3413+
// Net effect: primary becomes authoritative; the concurrent fallback mutation is "lost"
3414+
// from the caller's perspective, which is the correct outcome under this PR's design.
3415+
func TestMetadataStoreUpdateConcurrentFallbackWriterShadowed(t *testing.T) {
3416+
ctx := TestCtx(t)
3417+
bucket := GetTestBucket(t)
3418+
defer bucket.Close(ctx)
3419+
3420+
metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore())
3421+
3422+
docID := t.Name()
3423+
originalBody := []byte(`{"src":"fallback-original"}`)
3424+
concurrentBody := []byte(`{"src":"fallback-concurrent"}`)
3425+
wrapperBody := []byte(`{"src":"wrapper-update"}`)
3426+
3427+
ok, err := metaStore.Fallback().Add(docID, 0, originalBody)
3428+
require.NoError(t, err)
3429+
require.True(t, ok)
3430+
3431+
var calls int
3432+
var seenByCallback []byte
3433+
cas, err := metaStore.Update(docID, 0, func(current []byte) (updated []byte, expiry *uint32, isDelete bool, err error) {
3434+
calls++
3435+
seenByCallback = append([]byte(nil), current...)
3436+
// Concurrent writer races into fallback after we've already snapshotted it.
3437+
setErr := metaStore.Fallback().Set(docID, 0, nil, concurrentBody)
3438+
require.NoError(t, setErr)
3439+
return wrapperBody, nil, false, nil
3440+
})
3441+
require.NoError(t, err)
3442+
require.NotZero(t, cas)
3443+
assert.Equal(t, 1, calls, "primary insert should succeed first try; no retry expected when only fallback raced")
3444+
assert.Equal(t, originalBody, seenByCallback, "callback must see the snapshot, not the concurrent write")
3445+
3446+
// Primary holds the wrapper's update.
3447+
primaryRaw, primaryCas, err := metaStore.Primary().GetRaw(docID)
3448+
require.NoError(t, err)
3449+
assert.Equal(t, wrapperBody, primaryRaw)
3450+
assert.Equal(t, cas, primaryCas)
3451+
3452+
// Fallback holds the racer's value — we never wrote it.
3453+
fallbackRaw, _, err := metaStore.Fallback().GetRaw(docID)
3454+
require.NoError(t, err)
3455+
assert.Equal(t, concurrentBody, fallbackRaw)
3456+
3457+
// Wrapper-level read returns primary's value; the racer's fallback write is shadowed.
3458+
wrapperRaw, _, err := metaStore.GetRaw(docID)
3459+
require.NoError(t, err)
3460+
assert.Equal(t, wrapperBody, wrapperRaw, "wrapper read must surface primary, shadowing concurrent fallback writer")
3461+
}
3462+
3463+
// TestMetadataStoreWriteUpdateWithXattrsResurrectsAfterFallbackTombstone covers the
3464+
// dbconfig-style lifecycle Tor flagged in review: a doc with xattrs gets tombstoned (in this
3465+
// case the tombstone lives in fallback because the doc never made it to primary), and a
3466+
// subsequent WriteUpdateWithXattrs must resurrect it — landing the live doc in primary.
3467+
func TestMetadataStoreWriteUpdateWithXattrsResurrectsAfterFallbackTombstone(t *testing.T) {
3468+
SkipXattrTestsIfNotEnabled(t)
3469+
ctx := TestCtx(t)
3470+
bucket := GetTestBucket(t)
3471+
defer bucket.Close(ctx)
3472+
3473+
metaStore := NewMetadataStore(bucket.GetMobileSystemDataStore(), bucket.DefaultDataStore())
3474+
3475+
docID := t.Name()
3476+
xattrKey := SyncXattrName
3477+
xattrKeys := []string{xattrKey}
3478+
originalBody := []byte(`{"v":1}`)
3479+
originalXattr := []byte(`{"seq":1}`)
3480+
3481+
// Seed fallback only.
3482+
_, err := metaStore.Fallback().WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil,
3483+
func(doc []byte, xattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
3484+
return sgbucket.UpdatedDoc{
3485+
Doc: originalBody,
3486+
Xattrs: map[string][]byte{xattrKey: originalXattr},
3487+
}, nil
3488+
})
3489+
require.NoError(t, err)
3490+
3491+
// Step 1: tombstone via the wrapper. Tombstone lands on fallback (only store that
3492+
// ever held the doc).
3493+
tombstoneXattr := []byte(`{"seq":2,"deleted":true}`)
3494+
_, err = metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil,
3495+
func(_ []byte, _ map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) {
3496+
return sgbucket.UpdatedDoc{
3497+
Xattrs: map[string][]byte{xattrKey: tombstoneXattr},
3498+
IsTombstone: true,
3499+
}, nil
3500+
})
3501+
require.NoError(t, err)
3502+
3503+
// Step 2: resurrect via the wrapper. Callback observes the tombstone (body=nil,
3504+
// xattr retained); resurrection writes to primary as a fresh insert (cas=0).
3505+
resurrectBody := []byte(`{"v":3}`)
3506+
resurrectXattr := []byte(`{"seq":3}`)
3507+
var seenBody []byte
3508+
var seenXattrs map[string][]byte
3509+
cas, err := metaStore.WriteUpdateWithXattrs(ctx, docID, xattrKeys, 0, nil, nil,
3510+
func(doc []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) {
3511+
seenBody = append([]byte(nil), doc...)
3512+
seenXattrs = xattrs
3513+
return sgbucket.UpdatedDoc{
3514+
Doc: resurrectBody,
3515+
Xattrs: map[string][]byte{xattrKey: resurrectXattr},
3516+
}, nil
3517+
})
3518+
require.NoError(t, err)
3519+
require.NotZero(t, cas, "resurrection must land in primary with a real CAS")
3520+
assert.Nil(t, seenBody, "callback observes nil body on a fallback tombstone")
3521+
require.Contains(t, seenXattrs, xattrKey, "callback observes the retained tombstone xattr")
3522+
assert.JSONEq(t, string(tombstoneXattr), string(seenXattrs[xattrKey]))
3523+
3524+
// Resurrected doc lives in primary.
3525+
primaryBody, primaryXattrs, primaryCas, err := metaStore.Primary().GetWithXattrs(ctx, docID, xattrKeys)
3526+
require.NoError(t, err)
3527+
assert.Equal(t, resurrectBody, primaryBody)
3528+
require.Contains(t, primaryXattrs, xattrKey)
3529+
assert.Equal(t, resurrectXattr, primaryXattrs[xattrKey])
3530+
assert.Equal(t, cas, primaryCas)
3531+
}
3532+
33593533
func TestReadDoesNotGoToFallbackWhenMigrationComplete(t *testing.T) {
33603534
ctx := TestCtx(t)
33613535
bucket := GetTestBucket(t)

0 commit comments

Comments
 (0)