CBG-5326: Function to handle document channel history compaction#8218
CBG-5326: Function to handle document channel history compaction#8218RIT3shSapata wants to merge 9 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new DatabaseCollection helper to compact per-document channel assignment history up to a given sequence, with a REST-level test validating the basic behavior.
Changes:
- Introduces
CompactDocChannelHistory(ctx, docid, seq)to pruneChannelSetHistoryand persist the updated_sync(and_mou) xattrs. - Adds an API test that creates/removes/re-adds channels to generate history, then compacts and validates the result.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
db/crud.go |
Adds the CompactDocChannelHistory implementation and persists updated sync metadata via xattr update. |
rest/api_test.go |
Adds a high-level test covering history generation and compaction behavior. |
adamcfraser
left a comment
There was a problem hiding this comment.
Left one comment - have a look at that as well as the copilot comments and assign back once you've updated the PR.
| // Compact at seq 3 | ||
| err = collection.CompactDocChannelHistory(ctx, "doc4", 2) | ||
| require.NoError(t, err) | ||
|
|
||
| syncDataAfter, err := collection.GetDocSyncData(ctx, "doc4") | ||
| require.NoError(t, err) | ||
|
|
||
| // Verify: only entries with End > 3 or End == 0 (still active) should remain | ||
| for _, entry := range syncDataAfter.ChannelSet { | ||
| assert.True(t, entry.End == 0 || entry.End > uint64(3)) | ||
| } | ||
|
|
||
| // Verify only channels with seq > 3 should remain | ||
| for _, entry := range syncDataAfter.Channels { | ||
| assert.True(t, entry.Seq > uint64(3)) |
| syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7") | ||
| require.NoError(t, err) | ||
| err = collection.CompactDocChannelHistory(ctx, "doc7", 2) | ||
| require.NoError(t, err) | ||
|
|
||
| syncData, err := collection.GetDocSyncData(ctx, "doc7") | ||
| require.NoError(t, err) | ||
| // Should still have active channels | ||
| assert.Less(t, len(syncData.ChannelSet), len(syncDataBefore.ChannelSet)) | ||
| assert.Less(t, len(syncData.Channels), len(syncDataBefore.Channels)) |
| // CompactDocChannelHistory removes channel history entries that ended at or before the given sequence number. | ||
| // This is used to truncate stale channel assignment history to reduce storage overhead. | ||
| func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) (err error) { | ||
| key := realDocID(docid) | ||
| if key == "" { | ||
| return base.HTTPErrorf(400, "Invalid doc ID") | ||
| } | ||
|
|
||
| _, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncMouRevSeqNoAndUserXattrKeys()) | ||
| if err != nil { | ||
| return | ||
| } |
| metadataOnlyUpdate := computeMetadataOnlyUpdate(doc.Cas, revSeqNo, doc.MetadataOnlyUpdate) | ||
|
|
||
| rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate) | ||
| if err != nil { | ||
| return base.RedactErrorf("failed to marshal _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) | ||
| } | ||
|
|
||
| // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) | ||
| opts := &sgbucket.MutateInOptions{} | ||
| spec := []sgbucket.MacroExpansionSpec{ | ||
| sgbucket.NewMacroExpansionSpec(xattrCasPath(base.SyncXattrName), sgbucket.MacroCas), | ||
| sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas), | ||
| } | ||
| opts.MacroExpansion = spec | ||
| opts.PreserveExpiry = true // if doc has expiry, we should preserve this | ||
|
|
||
| updatedXattr := map[string][]byte{ | ||
| base.SyncXattrName: rawSyncXattr, | ||
| base.MouXattrName: rawMouXattr, | ||
| } |
adamcfraser
left a comment
There was a problem hiding this comment.
I like the new approach for computing changes. Some comments about handling for on-demand import and minor style suggestions.
| // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) | ||
| opts := &sgbucket.MutateInOptions{} | ||
| spec := []sgbucket.MacroExpansionSpec{ | ||
| sgbucket.NewMacroExpansionSpec(xattrCasPath(base.SyncXattrName), sgbucket.MacroCas), |
There was a problem hiding this comment.
It's correct to update sync.cas here, because we don't want this compaction to trigger a new document import. However, this means that we need to make sure this processing isn't being run on documents that haven't yet been imported - otherwise this cas update will make it appear as if this document has already been imported (and so the sync metadata would be incorrect).
I think the best way to avoid this is to trigger on-demand import as needed when initially fetching the document. I think you can do that by using GetDocument instead of GetWithXattrs when originally fetching the document on line 229 - this has the on-demand import work built-in.
Will want to add a test case for that (running compact on a non-imported document).
There was a problem hiding this comment.
Added check for non imported document and triggering on demand import in the same function. I went through the GetDocument method and the xattr keys which it fetches is base.SyncXattrName, base.VvXattrName, base.GlobalXattrName, the xattr keys which are required for compaction are base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName.
Let me know your thoughts on this approach, or would you prefer I use GetDocument itself.
Also while writing the test for this scenario, ran into a few challenges, will discuss this further offline.
| if !isSgWrite { | ||
| var importErr error | ||
|
|
||
| doc, importErr = c.OnDemandImportForGet(ctx, docid, doc, rawDoc, xattrs, cas) |
There was a problem hiding this comment.
Make sure you have a nil check on doc here too to avoid the same panic found in https://jira.issues.couchbase.com/browse/CBG-5355
Code else where for reference:
Line 198 in d09f0fd
We have a ticket in backlog to make this code better but not sure if we'll get to it in the 4.1 timeframe so be good to protect against it here.
refactor the slices and maps delete oprations, get and umarshall only the required xattrs, refactor the spec for mou update, use helper function to compute mou, refactor naked returns, early exit in a case where there is no compaction
| return base.HTTPErrorf(400, "Invalid doc ID") | ||
| } | ||
|
|
||
| xattrKeys := []string{base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName} |
| return importErr | ||
| } | ||
| if doc == nil { | ||
| return fmt.Errorf("skipping compaction of document %s, %v ", base.UD(docid), base.ErrNotFound) |
| // TestCompactNonImportedDocWithAutoImport verifies that when CompactDocChannelHistory is called | ||
| // on a non-imported document, it automatically imports the document first before performing compaction. | ||
| func TestCompactNonImportedDocWithAutoImport(t *testing.T) { | ||
| if !base.TestUseXattrs() { | ||
| t.Skip("CompactDocChannelHistory requires XATTR-based metadata") | ||
| } | ||
|
|
||
| // Create RestTester with AutoImport disabled to allow non-imported documents | ||
| rtConfig := RestTesterConfig{ | ||
| SyncFn: channels.DocChannelsSyncFunction, | ||
| DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{ | ||
| AutoImport: false, |
CBG-5326
Describe your PR here...
Pre-review checklist
fmt.Print,log.Print, ...)base.UD(docID),base.MD(dbName))docs/apiDependencies (if applicable)
Integration Tests