-
Notifications
You must be signed in to change notification settings - Fork 144
CBG-5326: Function to handle document channel history compaction #8218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
194e0fd
4e7f5b1
b4c398e
9d45b92
c5b9ae5
c518a09
738ae0a
592b361
faad903
f7f091c
69989f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ import ( | |||
| "maps" | ||||
| "math" | ||||
| "net/http" | ||||
| "slices" | ||||
| "strings" | ||||
| "time" | ||||
|
|
||||
|
|
@@ -221,6 +222,107 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( | |||
|
|
||||
| } | ||||
|
|
||||
| // CompactDocChannelHistory removes channel history entries that ended at or before the given sequence number. | ||||
| // This is used to truncate stale channel assignment history to reduce storage overhead. | ||||
| func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) ([]string, error) { | ||||
| key := realDocID(docid) | ||||
| if key == "" { | ||||
| return nil, base.HTTPErrorf(400, "Invalid doc ID") | ||||
| } | ||||
|
|
||||
| rawDoc, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncMouRevSeqNoAndUserXattrKeys()) | ||||
| if err != nil { | ||||
| return nil, err | ||||
| } | ||||
|
Comment on lines
+225
to
+236
|
||||
|
|
||||
| doc, err := c.unmarshalDocumentWithXattrs(ctx, key, nil, xattrs, cas, DocUnmarshalSync) | ||||
| if err != nil { | ||||
| return nil, err | ||||
| } | ||||
|
|
||||
| isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawDoc) | ||||
| if crc32Match { | ||||
| c.dbStats().Database().Crc32MatchCount.Add(1) | ||||
| } | ||||
|
|
||||
| if !isSgWrite { | ||||
| var importErr error | ||||
|
|
||||
| doc, importErr = c.OnDemandImportForGet(ctx, docid, doc, rawDoc, xattrs, cas) | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sure you have a nil check on doc here too to avoid the same panic found in https://jira.issues.couchbase.com/browse/CBG-5355 Code else where for reference: Line 198 in d09f0fd
We have a ticket in backlog to make this code better but not sure if we'll get to it in the 4.1 timeframe so be good to protect against it here. |
||||
| if importErr != nil { | ||||
| return nil, importErr | ||||
| } | ||||
| if doc == nil { | ||||
| return nil, base.ErrNotFound | ||||
| } | ||||
| cas = doc.Cas | ||||
| } | ||||
|
|
||||
| compactedChannels := make([]string, 0) | ||||
|
|
||||
| doc.SyncData.ChannelSetHistory = slices.DeleteFunc(doc.SyncData.ChannelSetHistory, func(channel ChannelSetEntry) bool { | ||||
| del := channel.End <= seq | ||||
| if del { | ||||
| compactedChannels = append(compactedChannels, channel.Name) | ||||
| } | ||||
| return del | ||||
| }) | ||||
|
|
||||
| doc.SyncData.ChannelSet = slices.DeleteFunc(doc.SyncData.ChannelSet, func(channel ChannelSetEntry) bool { | ||||
| del := channel.End != 0 && channel.End <= seq | ||||
| if del { | ||||
| compactedChannels = append(compactedChannels, channel.Name) | ||||
| } | ||||
| return del | ||||
| }) | ||||
|
|
||||
| for chanName, chanEntry := range doc.SyncData.Channels { | ||||
| if chanEntry != nil && chanEntry.Seq <= seq { | ||||
| compactedChannels = append(compactedChannels, chanName) | ||||
| delete(doc.SyncData.Channels, chanName) | ||||
| } | ||||
| } | ||||
|
|
||||
| // Exit early if no compaction occurred | ||||
| if len(compactedChannels) == 0 { | ||||
| return []string{}, nil | ||||
| } | ||||
|
|
||||
| rawSyncXattr, err := base.JSONMarshal(doc.SyncData) | ||||
| if err != nil { | ||||
| return nil, base.RedactErrorf("failed to marshal sync data when trying to compact channel history for doc:%s. Error: %v", base.UD(docid), err) | ||||
| } | ||||
|
|
||||
| revSeqNo, err := unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo]) | ||||
| if err != nil { | ||||
| base.WarnfCtx(ctx, `Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err) | ||||
| return nil, base.RedactErrorf(`Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err) | ||||
| } | ||||
|
|
||||
|
RIT3shSapata marked this conversation as resolved.
|
||||
| metadataOnlyUpdate := computeMetadataOnlyUpdate(doc.Cas, revSeqNo, doc.MetadataOnlyUpdate) | ||||
|
|
||||
|
Comment on lines
+296
to
+303
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @RIT3shSapata The above comment is correct - if an on-demand import occurs, the document's revSeqNo will have changed and will need to be re-fetched. |
||||
| rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate) | ||||
| if err != nil { | ||||
| return nil, base.RedactErrorf("failed to marshal _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) | ||||
| } | ||||
|
RIT3shSapata marked this conversation as resolved.
|
||||
|
|
||||
| // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) | ||||
| opts := &sgbucket.MutateInOptions{} | ||||
| spec := []sgbucket.MacroExpansionSpec{ | ||||
| sgbucket.NewMacroExpansionSpec(xattrCasPath(base.SyncXattrName), sgbucket.MacroCas), | ||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's correct to update sync.cas here, because we don't want this compaction to trigger a new document import. However, this means that we need to make sure this processing isn't being run on documents that haven't yet been imported - otherwise this cas update will make it appear as if this document has already been imported (and so the sync metadata would be incorrect). I think the best way to avoid this is to trigger on-demand import as needed when initially fetching the document. I think you can do that by using GetDocument instead of GetWithXattrs when originally fetching the document on line 229 - this has the on-demand import work built-in. Will want to add a test case for that (running compact on a non-imported document).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added check for non imported document and triggering on demand import in the same function. I went through the GetDocument method and the xattr keys which it fetches is Let me know your thoughts on this approach, or would you prefer I use Also while writing the test for this scenario, ran into a few challenges, will discuss this further offline. |
||||
| sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas), | ||||
| } | ||||
| opts.MacroExpansion = spec | ||||
| opts.PreserveExpiry = true // if doc has expiry, we should preserve this | ||||
|
|
||||
| updatedXattr := map[string][]byte{ | ||||
| base.SyncXattrName: rawSyncXattr, | ||||
| base.MouXattrName: rawMouXattr, | ||||
| } | ||||
|
Comment on lines
+302
to
+321
|
||||
| _, err = c.dataStore.UpdateXattrs(ctx, key, 0, cas, updatedXattr, opts) | ||||
|
RIT3shSapata marked this conversation as resolved.
|
||||
| return compactedChannels, err | ||||
| } | ||||
|
|
||||
| // unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map | ||||
| func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { | ||||
| return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.UserXattrKey()], xattrs[base.VirtualXattrRevSeqNo], xattrs[base.GlobalXattrName], cas, unmarshalLevel) | ||||
|
|
||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.