Skip to content
102 changes: 102 additions & 0 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"maps"
"math"
"net/http"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -221,6 +222,107 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (

}

// CompactDocChannelHistory removes channel history entries that ended at or before the given sequence number.
// This is used to truncate stale channel assignment history to reduce storage overhead.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This is used to truncate stale channel assignment history to reduce storage overhead.
// This is used to prune channel assignment history to reduce storage overhead.

func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) ([]string, error) {
key := realDocID(docid)
if key == "" {
return nil, base.HTTPErrorf(400, "Invalid doc ID")
}

rawDoc, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncMouRevSeqNoAndUserXattrKeys())
if err != nil {
return nil, err
}
Comment on lines +225 to +236

doc, err := c.unmarshalDocumentWithXattrs(ctx, key, nil, xattrs, cas, DocUnmarshalSync)
if err != nil {
return nil, err
}

isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawDoc)
if crc32Match {
c.dbStats().Database().Crc32MatchCount.Add(1)
}

if !isSgWrite {
var importErr error

doc, importErr = c.OnDemandImportForGet(ctx, docid, doc, rawDoc, xattrs, cas)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you have a nil check on doc here too to avoid the same panic found in https://jira.issues.couchbase.com/browse/CBG-5355

Code else where for reference:

if doc == nil {

We have a ticket in backlog to make this code better but not sure if we'll get to it in the 4.1 timeframe so be good to protect against it here.

if importErr != nil {
return nil, importErr
}
if doc == nil {
return nil, base.ErrNotFound
}
cas = doc.Cas
}

compactedChannels := make([]string, 0)

doc.SyncData.ChannelSetHistory = slices.DeleteFunc(doc.SyncData.ChannelSetHistory, func(channel ChannelSetEntry) bool {
del := channel.End <= seq
if del {
compactedChannels = append(compactedChannels, channel.Name)
}
return del
})

doc.SyncData.ChannelSet = slices.DeleteFunc(doc.SyncData.ChannelSet, func(channel ChannelSetEntry) bool {
del := channel.End != 0 && channel.End <= seq
if del {
compactedChannels = append(compactedChannels, channel.Name)
}
return del
})

for chanName, chanEntry := range doc.SyncData.Channels {
if chanEntry != nil && chanEntry.Seq <= seq {
compactedChannels = append(compactedChannels, chanName)
delete(doc.SyncData.Channels, chanName)
}
}

// Exit early if no compaction occurred
if len(compactedChannels) == 0 {
return []string{}, nil
}

rawSyncXattr, err := base.JSONMarshal(doc.SyncData)
if err != nil {
return nil, base.RedactErrorf("failed to marshal sync data when trying to compact channel history for doc:%s. Error: %v", base.UD(docid), err)
}

revSeqNo, err := unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo])
if err != nil {
base.WarnfCtx(ctx, `Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err)
return nil, base.RedactErrorf(`Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err)
}

Comment thread
RIT3shSapata marked this conversation as resolved.
metadataOnlyUpdate := computeMetadataOnlyUpdate(doc.Cas, revSeqNo, doc.MetadataOnlyUpdate)

Comment on lines +296 to +303
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RIT3shSapata The above comment is correct - if an on-demand import occurs, the document's revSeqNo will have changed and will need to be re-fetched.
Thinking about this further, I'm wondering if we should take a different approach for documents that are pending import. Let's discuss offline.

rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate)
if err != nil {
return nil, base.RedactErrorf("failed to marshal _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err)
}
Comment thread
RIT3shSapata marked this conversation as resolved.

// build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas)
opts := &sgbucket.MutateInOptions{}
spec := []sgbucket.MacroExpansionSpec{
sgbucket.NewMacroExpansionSpec(xattrCasPath(base.SyncXattrName), sgbucket.MacroCas),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's correct to update sync.cas here, because we don't want this compaction to trigger a new document import. However, this means that we need to make sure this processing isn't being run on documents that haven't yet been imported - otherwise this cas update will make it appear as if this document has already been imported (and so the sync metadata would be incorrect).

I think the best way to avoid this is to trigger on-demand import as needed when initially fetching the document. I think you can do that by using GetDocument instead of GetWithXattrs when originally fetching the document on line 229 - this has the on-demand import work built-in.

Will want to add a test case for that (running compact on a non-imported document).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added check for non imported document and triggering on demand import in the same function. I went through the GetDocument method and the xattr keys which it fetches is base.SyncXattrName, base.VvXattrName, base.GlobalXattrName, the xattr keys which are required for compaction are base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName.

Let me know your thoughts on this approach, or would you prefer I use GetDocument itself.

Also while writing the test for this scenario, ran into a few challenges, will discuss this further offline.

sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas),
}
opts.MacroExpansion = spec
opts.PreserveExpiry = true // if doc has expiry, we should preserve this

updatedXattr := map[string][]byte{
base.SyncXattrName: rawSyncXattr,
base.MouXattrName: rawMouXattr,
}
Comment on lines +302 to +321
_, err = c.dataStore.UpdateXattrs(ctx, key, 0, cas, updatedXattr, opts)
Comment thread
RIT3shSapata marked this conversation as resolved.
return compactedChannels, err
}

// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map
func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.UserXattrKey()], xattrs[base.VirtualXattrRevSeqNo], xattrs[base.GlobalXattrName], cas, unmarshalLevel)
Expand Down
277 changes: 277 additions & 0 deletions rest/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3916,3 +3916,280 @@ func TestFetchBackupRevisionByCVThroughAPI(t *testing.T) {
assert.Equal(t, createVersion.RevTreeID, body[db.BodyRev])
assert.Nil(t, body[db.BodyCV])
}

func TestDocumentChannelHistoryCompact(t *testing.T) {
Comment thread
RIT3shSapata marked this conversation as resolved.
if !base.TestUseXattrs() {
t.Skip("CompactDocChannelHistory requires XATTR-based metadata")
}
rt := NewRestTester(t, &RestTesterConfig{SyncFn: channels.DocChannelsSyncFunction})
defer rt.Close()

collection, ctx := rt.GetSingleTestDatabaseCollection()

t.Run("basic compaction", func(t *testing.T) {
// Create a document with a single channel assignment
version := rt.PutDoc("doc1", `{"channels": ["test"]}`)
syncData, err := collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)

require.Len(t, syncData.ChannelSet, 1)
assert.Equal(t, db.ChannelSetEntry{Name: "test", Start: 1, End: 0}, syncData.ChannelSet[0])
assert.Len(t, syncData.ChannelSetHistory, 0)

// Remove all channels - ends the existing channel range in ChannelSet
version = rt.UpdateDoc("doc1", version, `{"channels": []}`)
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)

require.Len(t, syncData.ChannelSet, 1)
assert.Equal(t, db.ChannelSetEntry{Name: "test", Start: 1, End: 2}, syncData.ChannelSet[0])
assert.Len(t, syncData.ChannelSetHistory, 0)

// Add multiple channels - generates history for the previously removed channel
_ = rt.UpdateDoc("doc1", version, `{"channels": ["test", "test2"]}`)
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)

require.Len(t, syncData.ChannelSet, 2)
assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 3, End: 0})
assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test2", Start: 3, End: 0})
require.Len(t, syncData.ChannelSetHistory, 1)
assert.Equal(t, db.ChannelSetEntry{Name: "test", Start: 1, End: 2}, syncData.ChannelSetHistory[0])

// Compact history at sequence 2 and verify history is cleared
compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc1", 2)
require.NoError(t, err)
assert.Equal(t, []string{"test"}, compactedChannels)
syncData, err = collection.GetDocSyncData(ctx, "doc1")
assert.NoError(t, err)
assert.Zero(t, len(syncData.ChannelSetHistory))
})

t.Run("seq zero keeps all history", func(t *testing.T) {
// Compact with seq=0 should keep all entries
version := rt.PutDoc("doc2", `{"channels": ["test"]}`)
version = rt.UpdateDoc("doc2", version, `{"channels": []}`)
_ = rt.UpdateDoc("doc2", version, `{"channels": ["test", "test2"]}`)

syncDataBefore, err := collection.GetDocSyncData(ctx, "doc2")
require.NoError(t, err)
historyLenBefore := len(syncDataBefore.ChannelSetHistory)
require.Greater(t, historyLenBefore, 0)

compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc2", 0)
require.NoError(t, err)
assert.Equal(t, []string{}, compactedChannels)

syncDataAfter, err := collection.GetDocSyncData(ctx, "doc2")
require.NoError(t, err)
assert.Equal(t, historyLenBefore, len(syncDataAfter.ChannelSetHistory))
})

t.Run("compact all history", func(t *testing.T) {
// Compact with very high seq removes all history
version := rt.PutDoc("doc3", `{"channels": ["test"]}`)
version = rt.UpdateDoc("doc3", version, `{"channels": []}`)
_ = rt.UpdateDoc("doc3", version, `{"channels": ["test", "test2"]}`)

syncDataBefore, err := collection.GetDocSyncData(ctx, "doc3")
require.NoError(t, err)

// Compact with very high seq number
compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc3", 999999)
require.NoError(t, err)
assert.Equal(t, []string{"test"}, compactedChannels)

syncData, err := collection.GetDocSyncData(ctx, "doc3")
require.NoError(t, err)
assert.Zero(t, len(syncData.ChannelSetHistory))
assert.Equal(t, len(syncDataBefore.Channels), len(syncData.Channels))
assert.Equal(t, len(syncDataBefore.ChannelSet), len(syncData.ChannelSet))
})

t.Run("partial history compaction", func(t *testing.T) {
// Compact removes entries with End <= seq, keeps entries with End > seq
version := rt.PutDoc("doc4", `{"channels": ["a"]}`)
version = rt.UpdateDoc("doc4", version, `{"channels": []}`)

docSeq := rt.GetDocumentSequence("doc4")

version = rt.UpdateDoc("doc4", version, `{"channels": ["b"]}`)
_ = rt.UpdateDoc("doc4", version, `{"channels": []}`)

syncDataBefore, err := collection.GetDocSyncData(ctx, "doc4")
require.NoError(t, err)
channelSetLenBefore := len(syncDataBefore.ChannelSet)
channelLenBefore := len(syncDataBefore.Channels)

compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc4", docSeq)
require.NoError(t, err)
assert.Equal(t, []string{"a", "a"}, compactedChannels)

syncDataAfter, err := collection.GetDocSyncData(ctx, "doc4")
require.NoError(t, err)

for _, entry := range syncDataAfter.ChannelSet {
assert.True(t, entry.End == 0 || entry.End > docSeq)
}

for _, entry := range syncDataAfter.Channels {
assert.True(t, entry.Seq > docSeq)
}

// Should have fewer than before
assert.Less(t, len(syncDataAfter.Channels), channelLenBefore)
assert.Less(t, len(syncDataAfter.ChannelSet), channelSetLenBefore)
})

t.Run("invalid doc id", func(t *testing.T) {
// Empty doc ID should return 400 error
_, err := collection.CompactDocChannelHistory(ctx, "", 1)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Invalid doc ID")
})

t.Run("nonexistent document", func(t *testing.T) {
// Compacting nonexistent doc should return not found error
_, err := collection.CompactDocChannelHistory(ctx, "nonexistent", 1)
assert.Error(t, err)
})

t.Run("multiple channels with mixed history", func(t *testing.T) {
// Complex scenario with multiple channels
version := rt.PutDoc("doc7", `{"channels": ["a", "b"]}`)
version = rt.UpdateDoc("doc7", version, `{"channels": ["a"]}`)

docSeq := rt.GetDocumentSequence("doc7")

_ = rt.UpdateDoc("doc7", version, `{"channels": ["a", "c"]}`)

syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7")
require.NoError(t, err)
compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc7", docSeq)
require.NoError(t, err)
assert.Equal(t, []string{"b", "b"}, compactedChannels)

syncData, err := collection.GetDocSyncData(ctx, "doc7")
require.NoError(t, err)
// Should still have active channels
assert.Less(t, len(syncData.ChannelSet), len(syncDataBefore.ChannelSet))
assert.Less(t, len(syncData.Channels), len(syncDataBefore.Channels))
Comment on lines +4066 to +4076
})

t.Run("compact empty history", func(t *testing.T) {
// Doc with no history should succeed
rt.PutDoc("doc8", `{"channels": ["test"]}`)

syncDataBefore, err := collection.GetDocSyncData(ctx, "doc8")
require.NoError(t, err)
assert.Len(t, syncDataBefore.ChannelSetHistory, 0)

compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc8", 1)
require.NoError(t, err)
assert.Equal(t, []string{}, compactedChannels)

syncDataAfter, err := collection.GetDocSyncData(ctx, "doc8")
require.NoError(t, err)
assert.Len(t, syncDataAfter.ChannelSetHistory, 0)
})

t.Run("verify xattr updates", func(t *testing.T) {
// Verify CAS is updated and no reimport happens
version := rt.PutDoc("doc10", `{"channels": ["test"]}`)
version = rt.UpdateDoc("doc10", version, `{"channels": []}`)
_ = rt.UpdateDoc("doc10", version, `{"channels": ["test", "test2"]}`)
seq := rt.GetDocumentSequence("doc10")

// After compaction, document should still be accessible
compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc10", seq)
require.NoError(t, err)
assert.Equal(t, []string{"test"}, compactedChannels)

// Verify doc is still accessible with correct data
syncData, err := collection.GetDocSyncData(ctx, "doc10")
require.NoError(t, err)
assert.NotNil(t, syncData)
})
}

// TestCompactNonImportedDocWithAutoImport verifies that when CompactDocChannelHistory is called
// on a non-imported document, it automatically imports the document first before performing compaction.
func TestCompactNonImportedDocWithAutoImport(t *testing.T) {
if !base.TestUseXattrs() {
t.Skip("CompactDocChannelHistory requires XATTR-based metadata")
}

// Create RestTester with AutoImport disabled to allow non-imported documents
rtConfig := RestTesterConfig{
SyncFn: channels.DocChannelsSyncFunction,
DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{
AutoImport: false,
Comment on lines +4115 to +4126
}},
Comment on lines +4115 to +4127
}
rt := NewRestTesterDefaultCollection(t, &rtConfig)
defer rt.Close()

collection, ctx := rt.GetSingleTestDatabaseCollection()

// Step 1-6: Use REST API to create document with channel history
nonImportedDocID := "non_imported_doc"

// Create initial document with a channel
version := rt.PutDoc(nonImportedDocID, `{"type":"test","channels":["test_channel"]}`)

// Update to remove channel (creates history)
version = rt.UpdateDoc(nonImportedDocID, version, `{"type":"test","channels":[]}`)

// Update again to add new channels (more history)
_ = rt.UpdateDoc(nonImportedDocID, version, `{"type":"test","channels":["test_channel","new_channel"]}`)

// Verify document has channel history
syncDataBefore, err := collection.GetDocSyncData(ctx, nonImportedDocID)
require.NoError(t, err)
require.Greater(t, len(syncDataBefore.ChannelSetHistory), 0, "document should have channel history")

// Step 6: Get document sequence for compaction point
docSeq := rt.GetDocumentSequence(nonImportedDocID)

// Step 7: Update the document body directly in the datastore to simulate external modification
// Read current document body
docBytesRaw, _, err := rt.GetSingleDataStore().GetRaw(ctx, nonImportedDocID)
require.NoError(t, err)
var docBody map[string]any
err = json.Unmarshal(docBytesRaw, &docBody)
require.NoError(t, err)

// Modify document body with external changes
docBody["modified"] = true
docBody["updatedAt"] = "external_update"
docBody["externalVersion"] = 2
modifiedDocBytes, err := json.Marshal(docBody)
require.NoError(t, err)

// Write modified body back to datastore
err = rt.GetSingleDataStore().SetRaw(ctx, nonImportedDocID, 0, nil, modifiedDocBytes)
require.NoError(t, err)

// Step 8: Call CompactDocChannelHistory - this will trigger the auto-import check
// which verifies the document is imported (has valid _sync xattr) before compacting
compactedChannels, err := collection.CompactDocChannelHistory(ctx, nonImportedDocID, docSeq-1)
require.NoError(t, err)
assert.Equal(t, []string{"test_channel"}, compactedChannels)

// Step 9: Verify compaction succeeded and history was removed
syncData, err := collection.GetDocSyncData(ctx, nonImportedDocID)
require.NoError(t, err)
// History should be compacted
assert.Less(t, len(syncData.ChannelSetHistory), len(syncDataBefore.ChannelSetHistory))

// Step 10: Verify document is still accessible and intact
docFromBucket, _, err := rt.GetSingleDataStore().GetRaw(ctx, nonImportedDocID)
require.NoError(t, err)
require.NotNil(t, docFromBucket)

// Verify the document body is intact after compaction
var finalBody map[string]any
err = json.Unmarshal(docFromBucket, &finalBody)
require.NoError(t, err)
assert.Equal(t, "test", finalBody["type"])
}
Loading