From 194e0fd465202ebcbdc54cb0ba5027b73013d3e5 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Wed, 29 Apr 2026 21:20:08 +0530 Subject: [PATCH 01/11] add function to delete the channel history of the document --- db/crud.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++ rest/api_test.go | 53 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/db/crud.go b/db/crud.go index 616f14b61c..584705caf9 100644 --- a/db/crud.go +++ b/db/crud.go @@ -221,6 +221,67 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( } +// CompactDocChannelHistory removes channel history entries that ended at or before the given sequence number. +// This is used to truncate stale channel assignment history to reduce storage overhead. +func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) (err error) { + key := realDocID(docid) + if key == "" { + return base.HTTPErrorf(400, "Invalid doc ID") + } + + _, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys()) + if err != nil { + return + } + + doc, err := c.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, DocUnmarshalSync) + if err != nil { + return + } + + var compactedChannelHistory []ChannelSetEntry + for _, channel := range doc.SyncData.ChannelSetHistory { + if channel.Start > seq { + compactedChannelHistory = append(compactedChannelHistory, channel) + } + } + + doc.SyncData.ChannelSetHistory = compactedChannelHistory + + rawSyncXattr, err := base.JSONMarshal(doc.SyncData) + if err != nil { + return base.RedactErrorf("failed to marshall sync data when trying to compact channel history for doc:%s. Error: %v", base.UD(docid), err) + } + + revSeqNo, err := unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo]) + if err != nil { + base.InfofCtx(ctx, base.KeyCRUD, `Could not determine the revSeqNo when attempting to compact channel history for doc: %s. Error: %v`, base.UD(docid), err) + } + + metadataOnlyUpdate := &MetadataOnlyUpdate{ + HexCAS: expandMacroCASValueString, + PreviousHexCAS: doc.SyncData.Cas, + PreviousRevSeqNo: revSeqNo, + } + rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate) + if err != nil { + return base.RedactErrorf("failed to marshall _mou when attempting to compact channel history for doc: %s. Error: %v") + } + + // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) + opts := &sgbucket.MutateInOptions{} + spec := append(macroExpandSpec(base.SyncXattrName), sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas)) + opts.MacroExpansion = spec + opts.PreserveExpiry = true // if doc has expiry, we should preserve this + + updatedXattr := map[string][]byte{ + base.SyncXattrName: rawSyncXattr, + base.MouXattrName: rawMouXattr, + } + _, err = c.dataStore.UpdateXattrs(ctx, key, 0, cas, updatedXattr, opts) + return err +} + // unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) { return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.UserXattrKey()], xattrs[base.VirtualXattrRevSeqNo], xattrs[base.GlobalXattrName], cas, unmarshalLevel) diff --git a/rest/api_test.go b/rest/api_test.go index 82c472be01..6af85e9875 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -3916,3 +3916,56 @@ func TestFetchBackupRevisionByCVThroughAPI(t *testing.T) { assert.Equal(t, createVersion.RevTreeID, body[db.BodyRev]) assert.Nil(t, body[db.BodyCV]) } + +func TestDocumentChannelHistoryCompact(t *testing.T) { + rt := NewRestTester(t, &RestTesterConfig{SyncFn: channels.DocChannelsSyncFunction}) + defer rt.Close() + + var body db.Body + + // Create a document with a single channel assignment + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc", `{"channels": ["test"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + assert.NoError(t, err) + collection, ctx := rt.GetSingleTestDatabaseCollection() + syncData, err := collection.GetDocSyncData(ctx, "doc") + assert.NoError(t, err) + + require.Len(t, syncData.ChannelSet, 1) + assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 0}) + assert.Len(t, syncData.ChannelSetHistory, 0) + + // Remove all channels - creates history entry for the removed channel + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc?rev="+body["rev"].(string), `{"channels": []}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + assert.NoError(t, err) + syncData, err = collection.GetDocSyncData(ctx, "doc") + assert.NoError(t, err) + + require.Len(t, syncData.ChannelSet, 1) + assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + assert.Len(t, syncData.ChannelSetHistory, 0) + + // Add multiple channels - generates history for the previously removed channel + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + assert.NoError(t, err) + syncData, err = collection.GetDocSyncData(ctx, "doc") + assert.NoError(t, err) + + require.Len(t, syncData.ChannelSet, 2) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 3, End: 0}) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test2", Start: 3, End: 0}) + require.Len(t, syncData.ChannelSetHistory, 1) + assert.Equal(t, syncData.ChannelSetHistory[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + + // Compact history at sequence 2 and verify history is cleared + err = collection.CompactDocChannelHistory(ctx, "doc", 2) + require.NoError(t, err) + syncData, err = collection.GetDocSyncData(ctx, "doc") + assert.NoError(t, err) + assert.Nil(t, syncData.ChannelSetHistory) +} From 4e7f5b1c3d2129167ae242268be0fbb1002f9292 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Thu, 30 Apr 2026 18:59:35 +0530 Subject: [PATCH 02/11] lint fix --- db/crud.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/crud.go b/db/crud.go index 584705caf9..30f7315b95 100644 --- a/db/crud.go +++ b/db/crud.go @@ -265,7 +265,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid } rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate) if err != nil { - return base.RedactErrorf("failed to marshall _mou when attempting to compact channel history for doc: %s. Error: %v") + return base.RedactErrorf("failed to marshall _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) } // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) From b4c398ee95bbd08acfdb2065a25f2b3db1d31879 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Tue, 5 May 2026 19:26:34 +0530 Subject: [PATCH 03/11] fixes based on pr comments --- db/crud.go | 30 +++-- rest/api_test.go | 289 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 272 insertions(+), 47 deletions(-) diff --git a/db/crud.go b/db/crud.go index 30f7315b95..1a5bddd4bc 100644 --- a/db/crud.go +++ b/db/crud.go @@ -229,7 +229,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid return base.HTTPErrorf(400, "Invalid doc ID") } - _, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys()) + _, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncMouRevSeqNoAndUserXattrKeys()) if err != nil { return } @@ -239,18 +239,34 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid return } - var compactedChannelHistory []ChannelSetEntry + var compactedChannelSetHistory []ChannelSetEntry for _, channel := range doc.SyncData.ChannelSetHistory { - if channel.Start > seq { - compactedChannelHistory = append(compactedChannelHistory, channel) + // Keep entries that are still active or that ended after the compaction point. + if channel.End > seq { + compactedChannelSetHistory = append(compactedChannelSetHistory, channel) } } + doc.SyncData.ChannelSetHistory = compactedChannelSetHistory - doc.SyncData.ChannelSetHistory = compactedChannelHistory + var compactedChannelSet []ChannelSetEntry + for _, channel := range doc.SyncData.ChannelSet { + if channel.End == 0 || channel.End > seq { + compactedChannelSet = append(compactedChannelSet, channel) + } + } + doc.SyncData.ChannelSet = compactedChannelSet + + compactedChannels := make(channels.ChannelMap) + for chanName, chanEntry := range doc.SyncData.Channels { + if chanEntry == nil || chanEntry.Seq > seq { + compactedChannels[chanName] = chanEntry + } + } + doc.SyncData.Channels = compactedChannels rawSyncXattr, err := base.JSONMarshal(doc.SyncData) if err != nil { - return base.RedactErrorf("failed to marshall sync data when trying to compact channel history for doc:%s. Error: %v", base.UD(docid), err) + return base.RedactErrorf("failed to marshal sync data when trying to compact channel history for doc:%s. Error: %v", base.UD(docid), err) } revSeqNo, err := unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo]) @@ -265,7 +281,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid } rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate) if err != nil { - return base.RedactErrorf("failed to marshall _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) + return base.RedactErrorf("failed to marshal _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) } // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) diff --git a/rest/api_test.go b/rest/api_test.go index 6af85e9875..a562868519 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -3918,54 +3918,263 @@ func TestFetchBackupRevisionByCVThroughAPI(t *testing.T) { } func TestDocumentChannelHistoryCompact(t *testing.T) { + if !base.TestUseXattrs() { + t.Skip("CompactDocChannelHistory requires XATTR-based metadata") + } rt := NewRestTester(t, &RestTesterConfig{SyncFn: channels.DocChannelsSyncFunction}) defer rt.Close() - var body db.Body - - // Create a document with a single channel assignment - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc", `{"channels": ["test"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - assert.NoError(t, err) collection, ctx := rt.GetSingleTestDatabaseCollection() - syncData, err := collection.GetDocSyncData(ctx, "doc") - assert.NoError(t, err) - require.Len(t, syncData.ChannelSet, 1) - assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 0}) - assert.Len(t, syncData.ChannelSetHistory, 0) + t.Run("basic compaction", func(t *testing.T) { + var body db.Body + // Create a document with a single channel assignment + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"channels": ["test"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + syncData, err := collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) - // Remove all channels - creates history entry for the removed channel - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc?rev="+body["rev"].(string), `{"channels": []}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - assert.NoError(t, err) - syncData, err = collection.GetDocSyncData(ctx, "doc") - assert.NoError(t, err) + require.Len(t, syncData.ChannelSet, 1) + assert.Equal(t, db.ChannelSetEntry{Name: "test", Start: 1, End: 0}, syncData.ChannelSet[0]) + assert.Len(t, syncData.ChannelSetHistory, 0) - require.Len(t, syncData.ChannelSet, 1) - assert.Equal(t, syncData.ChannelSet[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) - assert.Len(t, syncData.ChannelSetHistory, 0) + // Remove all channels - ends the existing channel range in ChannelSet + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc1?rev="+body["rev"].(string), `{"channels": []}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + syncData, err = collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) - // Add multiple channels - generates history for the previously removed channel - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - assert.NoError(t, err) - syncData, err = collection.GetDocSyncData(ctx, "doc") - assert.NoError(t, err) + require.Len(t, syncData.ChannelSet, 1) + assert.Equal(t, db.ChannelSetEntry{Name: "test", Start: 1, End: 2}, syncData.ChannelSet[0]) + assert.Len(t, syncData.ChannelSetHistory, 0) - require.Len(t, syncData.ChannelSet, 2) - assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 3, End: 0}) - assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test2", Start: 3, End: 0}) - require.Len(t, syncData.ChannelSetHistory, 1) - assert.Equal(t, syncData.ChannelSetHistory[0], db.ChannelSetEntry{Name: "test", Start: 1, End: 2}) + // Add multiple channels - generates history for the previously removed channel + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc1?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + syncData, err = collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) - // Compact history at sequence 2 and verify history is cleared - err = collection.CompactDocChannelHistory(ctx, "doc", 2) - require.NoError(t, err) - syncData, err = collection.GetDocSyncData(ctx, "doc") - assert.NoError(t, err) - assert.Nil(t, syncData.ChannelSetHistory) + require.Len(t, syncData.ChannelSet, 2) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test", Start: 3, End: 0}) + assert.Contains(t, syncData.ChannelSet, db.ChannelSetEntry{Name: "test2", Start: 3, End: 0}) + require.Len(t, syncData.ChannelSetHistory, 1) + assert.Equal(t, db.ChannelSetEntry{Name: "test", Start: 1, End: 2}, syncData.ChannelSetHistory[0]) + + // Compact history at sequence 2 and verify history is cleared + err = collection.CompactDocChannelHistory(ctx, "doc1", 2) + require.NoError(t, err) + syncData, err = collection.GetDocSyncData(ctx, "doc1") + assert.NoError(t, err) + assert.Nil(t, syncData.ChannelSetHistory) + }) + + t.Run("seq zero keeps all history", func(t *testing.T) { + // Compact with seq=0 should keep all entries + var body db.Body + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc2", `{"channels": ["test"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc2?rev="+body["rev"].(string), `{"channels": []}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc2?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + syncDataBefore, err := collection.GetDocSyncData(ctx, "doc2") + require.NoError(t, err) + historyLenBefore := len(syncDataBefore.ChannelSetHistory) + require.Greater(t, historyLenBefore, 0) + + err = collection.CompactDocChannelHistory(ctx, "doc2", 0) + require.NoError(t, err) + + syncDataAfter, err := collection.GetDocSyncData(ctx, "doc2") + require.NoError(t, err) + assert.Equal(t, historyLenBefore, len(syncDataAfter.ChannelSetHistory)) + }) + + t.Run("compact all history", func(t *testing.T) { + // Compact with very high seq removes all history + var body db.Body + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc3", `{"channels": ["test"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc3?rev="+body["rev"].(string), `{"channels": []}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc3?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + syncDataBefore, err := collection.GetDocSyncData(ctx, "doc3") + require.NoError(t, err) + + // Compact with very high seq number + err = collection.CompactDocChannelHistory(ctx, "doc3", 999999) + require.NoError(t, err) + + syncData, err := collection.GetDocSyncData(ctx, "doc3") + require.NoError(t, err) + assert.Nil(t, syncData.ChannelSetHistory) + assert.Equal(t, len(syncDataBefore.Channels), len(syncData.Channels)) + assert.Equal(t, len(syncDataBefore.ChannelSet), len(syncData.ChannelSet)) + }) + + t.Run("partial history compaction", func(t *testing.T) { + // Compact removes entries with End <= seq, keeps entries with End > seq + var body db.Body + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc4", `{"channels": ["a"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": []}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": ["b"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": []}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + syncDataBefore, err := collection.GetDocSyncData(ctx, "doc4") + require.NoError(t, err) + channelSetLenBefore := len(syncDataBefore.ChannelSet) + channelLenBefore := len(syncDataBefore.Channels) + + // Compact at seq 3 + err = collection.CompactDocChannelHistory(ctx, "doc4", 2) + require.NoError(t, err) + + syncDataAfter, err := collection.GetDocSyncData(ctx, "doc4") + require.NoError(t, err) + + // Verify: only entries with End > 3 or End == 0 (still active) should remain + for _, entry := range syncDataAfter.ChannelSet { + assert.True(t, entry.End == 0 || entry.End > uint64(3)) + } + + // Verify only channels with seq > 3 should remain + for _, entry := range syncDataAfter.Channels { + assert.True(t, entry.Seq > uint64(3)) + } + + // Should have fewer than before + assert.Less(t, len(syncDataAfter.Channels), channelLenBefore) + assert.Less(t, len(syncDataAfter.ChannelSet), channelSetLenBefore) + }) + + t.Run("invalid doc id", func(t *testing.T) { + // Empty doc ID should return 400 error + err := collection.CompactDocChannelHistory(ctx, "", 1) + assert.Error(t, err) + assert.Contains(t, err.Error(), "Invalid doc ID") + }) + + t.Run("nonexistent document", func(t *testing.T) { + // Compacting nonexistent doc should return not found error + err := collection.CompactDocChannelHistory(ctx, "nonexistent", 1) + assert.Error(t, err) + }) + + t.Run("multiple channels with mixed history", func(t *testing.T) { + // Complex scenario with multiple channels + var body db.Body + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc7", `{"channels": ["a", "b"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc7?rev="+body["rev"].(string), `{"channels": ["a"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc7?rev="+body["rev"].(string), `{"channels": ["a", "c"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7") + require.NoError(t, err) + err = collection.CompactDocChannelHistory(ctx, "doc7", 2) + require.NoError(t, err) + + syncData, err := collection.GetDocSyncData(ctx, "doc7") + require.NoError(t, err) + // Should still have active channels + assert.Less(t, len(syncData.ChannelSet), len(syncDataBefore.ChannelSet)) + assert.Less(t, len(syncData.Channels), len(syncDataBefore.Channels)) + }) + + t.Run("compact empty history", func(t *testing.T) { + // Doc with no history should succeed + var body db.Body + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc8", `{"channels": ["test"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + syncDataBefore, err := collection.GetDocSyncData(ctx, "doc8") + require.NoError(t, err) + assert.Len(t, syncDataBefore.ChannelSetHistory, 0) + + err = collection.CompactDocChannelHistory(ctx, "doc8", 1) + require.NoError(t, err) + + syncDataAfter, err := collection.GetDocSyncData(ctx, "doc8") + require.NoError(t, err) + assert.Len(t, syncDataAfter.ChannelSetHistory, 0) + }) + + t.Run("verify xattr updates", func(t *testing.T) { + // Verify CAS is updated and no reimport happens + var body db.Body + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc10", `{"channels": ["test"]}`) + RequireStatus(t, resp, http.StatusCreated) + err := json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc10?rev="+body["rev"].(string), `{"channels": []}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc10?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) + RequireStatus(t, resp, http.StatusCreated) + err = json.Unmarshal(resp.BodyBytes(), &body) + require.NoError(t, err) + + // After compaction, document should still be accessible + err = collection.CompactDocChannelHistory(ctx, "doc10", 2) + require.NoError(t, err) + + // Verify doc is still accessible with correct data + syncData, err := collection.GetDocSyncData(ctx, "doc10") + require.NoError(t, err) + assert.NotNil(t, syncData) + }) } From 9d45b92c1c00348804041a36b29215afe2107410 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Thu, 7 May 2026 20:52:39 +0530 Subject: [PATCH 04/11] fix seq of subtests --- rest/api_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rest/api_test.go b/rest/api_test.go index a562868519..8ea620294f 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -4050,6 +4050,8 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { err = json.Unmarshal(resp.BodyBytes(), &body) require.NoError(t, err) + docSeq := rt.GetDocumentSequence("doc4") + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": ["b"]}`) RequireStatus(t, resp, http.StatusCreated) err = json.Unmarshal(resp.BodyBytes(), &body) @@ -4066,7 +4068,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { channelLenBefore := len(syncDataBefore.Channels) // Compact at seq 3 - err = collection.CompactDocChannelHistory(ctx, "doc4", 2) + err = collection.CompactDocChannelHistory(ctx, "doc4", docSeq) require.NoError(t, err) syncDataAfter, err := collection.GetDocSyncData(ctx, "doc4") @@ -4113,6 +4115,8 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { err = json.Unmarshal(resp.BodyBytes(), &body) require.NoError(t, err) + docSeq := rt.GetDocumentSequence("doc7") + resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc7?rev="+body["rev"].(string), `{"channels": ["a", "c"]}`) RequireStatus(t, resp, http.StatusCreated) err = json.Unmarshal(resp.BodyBytes(), &body) @@ -4120,7 +4124,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7") require.NoError(t, err) - err = collection.CompactDocChannelHistory(ctx, "doc7", 2) + err = collection.CompactDocChannelHistory(ctx, "doc7", docSeq) require.NoError(t, err) syncData, err := collection.GetDocSyncData(ctx, "doc7") From c5b9ae5489a8336fe494b421f5b06a5684abf238 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Fri, 8 May 2026 14:54:21 +0530 Subject: [PATCH 05/11] fixes based on pr comments refactor the slices and maps delete oprations, get and umarshall only the required xattrs, refactor the spec for mou update, use helper function to compute mou, refactor naked returns, early exit in a case where there is no compaction --- db/crud.go | 63 +++++++++++++++++++++++++----------------------- rest/api_test.go | 4 +-- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/db/crud.go b/db/crud.go index 1a5bddd4bc..afc4feea1e 100644 --- a/db/crud.go +++ b/db/crud.go @@ -15,6 +15,7 @@ import ( "maps" "math" "net/http" + "slices" "strings" "time" @@ -223,46 +224,48 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( // CompactDocChannelHistory removes channel history entries that ended at or before the given sequence number. // This is used to truncate stale channel assignment history to reduce storage overhead. -func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) (err error) { +func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) error { key := realDocID(docid) if key == "" { return base.HTTPErrorf(400, "Invalid doc ID") } - _, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncMouRevSeqNoAndUserXattrKeys()) + xattrKeys := []string{base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName} + _, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, xattrKeys) if err != nil { - return + return err } - doc, err := c.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, DocUnmarshalSync) + doc, err := c.unmarshalDocumentWithXattrs(ctx, key, nil, xattrs, cas, DocUnmarshalSync) if err != nil { - return + return err } - var compactedChannelSetHistory []ChannelSetEntry - for _, channel := range doc.SyncData.ChannelSetHistory { - // Keep entries that are still active or that ended after the compaction point. - if channel.End > seq { - compactedChannelSetHistory = append(compactedChannelSetHistory, channel) - } - } - doc.SyncData.ChannelSetHistory = compactedChannelSetHistory + // Store lengths before compaction to detect if any changes occur + historyLenBefore := len(doc.SyncData.ChannelSetHistory) + channelSetLenBefore := len(doc.SyncData.ChannelSet) + channelsLenBefore := len(doc.SyncData.Channels) - var compactedChannelSet []ChannelSetEntry - for _, channel := range doc.SyncData.ChannelSet { - if channel.End == 0 || channel.End > seq { - compactedChannelSet = append(compactedChannelSet, channel) - } - } - doc.SyncData.ChannelSet = compactedChannelSet + doc.SyncData.ChannelSetHistory = slices.DeleteFunc(doc.SyncData.ChannelSetHistory, func(channel ChannelSetEntry) bool { + return channel.End <= seq + }) + + doc.SyncData.ChannelSet = slices.DeleteFunc(doc.SyncData.ChannelSet, func(channel ChannelSetEntry) bool { + return channel.End != 0 && channel.End <= seq + }) - compactedChannels := make(channels.ChannelMap) for chanName, chanEntry := range doc.SyncData.Channels { - if chanEntry == nil || chanEntry.Seq > seq { - compactedChannels[chanName] = chanEntry + if chanEntry != nil && chanEntry.Seq <= seq { + delete(doc.SyncData.Channels, chanName) } } - doc.SyncData.Channels = compactedChannels + + // Exit early if no compaction occurred + if len(doc.SyncData.ChannelSetHistory) == historyLenBefore && + len(doc.SyncData.ChannelSet) == channelSetLenBefore && + len(doc.SyncData.Channels) == channelsLenBefore { + return nil + } rawSyncXattr, err := base.JSONMarshal(doc.SyncData) if err != nil { @@ -274,11 +277,8 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid base.InfofCtx(ctx, base.KeyCRUD, `Could not determine the revSeqNo when attempting to compact channel history for doc: %s. Error: %v`, base.UD(docid), err) } - metadataOnlyUpdate := &MetadataOnlyUpdate{ - HexCAS: expandMacroCASValueString, - PreviousHexCAS: doc.SyncData.Cas, - PreviousRevSeqNo: revSeqNo, - } + metadataOnlyUpdate := computeMetadataOnlyUpdate(doc.Cas, revSeqNo, doc.MetadataOnlyUpdate) + rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate) if err != nil { return base.RedactErrorf("failed to marshal _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) @@ -286,7 +286,10 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) opts := &sgbucket.MutateInOptions{} - spec := append(macroExpandSpec(base.SyncXattrName), sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas)) + spec := []sgbucket.MacroExpansionSpec{ + sgbucket.NewMacroExpansionSpec(xattrCasPath(base.SyncXattrName), sgbucket.MacroCas), + sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas), + } opts.MacroExpansion = spec opts.PreserveExpiry = true // if doc has expiry, we should preserve this diff --git a/rest/api_test.go b/rest/api_test.go index 8ea620294f..90e6018cd6 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -3971,7 +3971,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { require.NoError(t, err) syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) - assert.Nil(t, syncData.ChannelSetHistory) + assert.Zero(t, len(syncData.ChannelSetHistory)) }) t.Run("seq zero keeps all history", func(t *testing.T) { @@ -4032,7 +4032,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { syncData, err := collection.GetDocSyncData(ctx, "doc3") require.NoError(t, err) - assert.Nil(t, syncData.ChannelSetHistory) + assert.Zero(t, len(syncData.ChannelSetHistory)) assert.Equal(t, len(syncDataBefore.Channels), len(syncData.Channels)) assert.Equal(t, len(syncDataBefore.ChannelSet), len(syncData.ChannelSet)) }) From c518a09f6169d511506b48ed8d2ed62f1cf69abd Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Mon, 11 May 2026 20:37:25 +0530 Subject: [PATCH 06/11] fixes based on pr comments --- db/crud.go | 19 ++++++- rest/api_test.go | 125 +++++++++-------------------------------------- 2 files changed, 40 insertions(+), 104 deletions(-) diff --git a/db/crud.go b/db/crud.go index afc4feea1e..dd5b80b152 100644 --- a/db/crud.go +++ b/db/crud.go @@ -231,7 +231,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid } xattrKeys := []string{base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName} - _, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, xattrKeys) + rawDoc, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, xattrKeys) if err != nil { return err } @@ -241,6 +241,20 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid return err } + isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawDoc) + if crc32Match { + c.dbStats().Database().Crc32MatchCount.Add(1) + } + + if !isSgWrite { + var importErr error + + doc, importErr = c.OnDemandImportForGet(ctx, docid, doc, rawDoc, xattrs, cas) + if importErr != nil { + return importErr + } + } + // Store lengths before compaction to detect if any changes occur historyLenBefore := len(doc.SyncData.ChannelSetHistory) channelSetLenBefore := len(doc.SyncData.ChannelSet) @@ -274,7 +288,8 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid revSeqNo, err := unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo]) if err != nil { - base.InfofCtx(ctx, base.KeyCRUD, `Could not determine the revSeqNo when attempting to compact channel history for doc: %s. Error: %v`, base.UD(docid), err) + base.WarnfCtx(ctx, `Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err) + return base.RedactErrorf(`Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err) } metadataOnlyUpdate := computeMetadataOnlyUpdate(doc.Cas, revSeqNo, doc.MetadataOnlyUpdate) diff --git a/rest/api_test.go b/rest/api_test.go index 90e6018cd6..3af0eabffe 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -3927,12 +3927,8 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { collection, ctx := rt.GetSingleTestDatabaseCollection() t.Run("basic compaction", func(t *testing.T) { - var body db.Body // Create a document with a single channel assignment - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"channels": ["test"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version := rt.PutDoc("doc1", `{"channels": ["test"]}`) syncData, err := collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) @@ -3941,10 +3937,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { assert.Len(t, syncData.ChannelSetHistory, 0) // Remove all channels - ends the existing channel range in ChannelSet - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc1?rev="+body["rev"].(string), `{"channels": []}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version = rt.UpdateDoc("doc1", version, `{"channels": []}`) syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) @@ -3953,10 +3946,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { assert.Len(t, syncData.ChannelSetHistory, 0) // Add multiple channels - generates history for the previously removed channel - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc1?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version = rt.UpdateDoc("doc1", version, `{"channels": ["test", "test2"]}`) syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) @@ -3976,21 +3966,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { t.Run("seq zero keeps all history", func(t *testing.T) { // Compact with seq=0 should keep all entries - var body db.Body - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc2", `{"channels": ["test"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc2?rev="+body["rev"].(string), `{"channels": []}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc2?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version := rt.PutDoc("doc2", `{"channels": ["test"]}`) + version = rt.UpdateDoc("doc2", version, `{"channels": []}`) + version = rt.UpdateDoc("doc2", version, `{"channels": ["test", "test2"]}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc2") require.NoError(t, err) @@ -4007,21 +3985,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { t.Run("compact all history", func(t *testing.T) { // Compact with very high seq removes all history - var body db.Body - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc3", `{"channels": ["test"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc3?rev="+body["rev"].(string), `{"channels": []}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc3?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version := rt.PutDoc("doc3", `{"channels": ["test"]}`) + version = rt.UpdateDoc("doc3", version, `{"channels": []}`) + version = rt.UpdateDoc("doc3", version, `{"channels": ["test", "test2"]}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc3") require.NoError(t, err) @@ -4039,49 +4005,31 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { t.Run("partial history compaction", func(t *testing.T) { // Compact removes entries with End <= seq, keeps entries with End > seq - var body db.Body - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc4", `{"channels": ["a"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": []}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version := rt.PutDoc("doc4", `{"channels": ["a"]}`) + version = rt.UpdateDoc("doc4", version, `{"channels": []}`) docSeq := rt.GetDocumentSequence("doc4") - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": ["b"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": []}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version = rt.UpdateDoc("doc4", version, `{"channels": ["b"]}`) + version = rt.UpdateDoc("doc4", version, `{"channels": []}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc4") require.NoError(t, err) channelSetLenBefore := len(syncDataBefore.ChannelSet) channelLenBefore := len(syncDataBefore.Channels) - // Compact at seq 3 err = collection.CompactDocChannelHistory(ctx, "doc4", docSeq) require.NoError(t, err) syncDataAfter, err := collection.GetDocSyncData(ctx, "doc4") require.NoError(t, err) - // Verify: only entries with End > 3 or End == 0 (still active) should remain for _, entry := range syncDataAfter.ChannelSet { - assert.True(t, entry.End == 0 || entry.End > uint64(3)) + assert.True(t, entry.End == 0 || entry.End > docSeq) } - // Verify only channels with seq > 3 should remain for _, entry := range syncDataAfter.Channels { - assert.True(t, entry.Seq > uint64(3)) + assert.True(t, entry.Seq > docSeq) } // Should have fewer than before @@ -4104,23 +4052,12 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { t.Run("multiple channels with mixed history", func(t *testing.T) { // Complex scenario with multiple channels - var body db.Body - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc7", `{"channels": ["a", "b"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc7?rev="+body["rev"].(string), `{"channels": ["a"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version := rt.PutDoc("doc7", `{"channels": ["a", "b"]}`) + version = rt.UpdateDoc("doc7", version, `{"channels": ["a"]}`) docSeq := rt.GetDocumentSequence("doc7") - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc7?rev="+body["rev"].(string), `{"channels": ["a", "c"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version = rt.UpdateDoc("doc7", version, `{"channels": ["a", "c"]}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7") require.NoError(t, err) @@ -4136,11 +4073,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { t.Run("compact empty history", func(t *testing.T) { // Doc with no history should succeed - var body db.Body - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc8", `{"channels": ["test"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + rt.PutDoc("doc8", `{"channels": ["test"]}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc8") require.NoError(t, err) @@ -4156,24 +4089,12 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { t.Run("verify xattr updates", func(t *testing.T) { // Verify CAS is updated and no reimport happens - var body db.Body - resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc10", `{"channels": ["test"]}`) - RequireStatus(t, resp, http.StatusCreated) - err := json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc10?rev="+body["rev"].(string), `{"channels": []}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) - - resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc10?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`) - RequireStatus(t, resp, http.StatusCreated) - err = json.Unmarshal(resp.BodyBytes(), &body) - require.NoError(t, err) + version := rt.PutDoc("doc10", `{"channels": ["test"]}`) + version = rt.UpdateDoc("doc10", version, `{"channels": []}`) + version = rt.UpdateDoc("doc10", version, `{"channels": ["test", "test2"]}`) // After compaction, document should still be accessible - err = collection.CompactDocChannelHistory(ctx, "doc10", 2) + err := collection.CompactDocChannelHistory(ctx, "doc10", 2) require.NoError(t, err) // Verify doc is still accessible with correct data From 738ae0ac9b9518c952f81ce551ea53615606625a Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Wed, 13 May 2026 15:28:40 +0530 Subject: [PATCH 07/11] add test for autoimport during channel history compaction --- db/crud.go | 4 +++ rest/api_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/db/crud.go b/db/crud.go index dd5b80b152..cf6d27d8e6 100644 --- a/db/crud.go +++ b/db/crud.go @@ -253,6 +253,10 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid if importErr != nil { return importErr } + if doc == nil { + return fmt.Errorf("skipping compaction of document %s, %v ", base.UD(docid), base.ErrNotFound) + } + cas = doc.Cas } // Store lengths before compaction to detect if any changes occur diff --git a/rest/api_test.go b/rest/api_test.go index 3af0eabffe..afea91c59b 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -4103,3 +4103,84 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { assert.NotNil(t, syncData) }) } + +// TestCompactNonImportedDocWithAutoImport verifies that when CompactDocChannelHistory is called +// on a non-imported document, it automatically imports the document first before performing compaction. +func TestCompactNonImportedDocWithAutoImport(t *testing.T) { + if !base.TestUseXattrs() { + t.Skip("CompactDocChannelHistory requires XATTR-based metadata") + } + + // Create RestTester with AutoImport disabled to allow non-imported documents + rtConfig := RestTesterConfig{ + SyncFn: channels.DocChannelsSyncFunction, + DatabaseConfig: &DatabaseConfig{DbConfig: DbConfig{ + AutoImport: false, + }}, + } + rt := NewRestTesterDefaultCollection(t, &rtConfig) + defer rt.Close() + + collection, ctx := rt.GetSingleTestDatabaseCollection() + + // Step 1-6: Use REST API to create document with channel history + nonImportedDocID := "non_imported_doc" + + // Create initial document with a channel + version := rt.PutDoc(nonImportedDocID, `{"type":"test","channels":["test_channel"]}`) + + // Update to remove channel (creates history) + version = rt.UpdateDoc(nonImportedDocID, version, `{"type":"test","channels":[]}`) + + // Update again to add new channels (more history) + version = rt.UpdateDoc(nonImportedDocID, version, `{"type":"test","channels":["test_channel","new_channel"]}`) + + // Verify document has channel history + syncDataBefore, err := collection.GetDocSyncData(ctx, nonImportedDocID) + require.NoError(t, err) + require.Greater(t, len(syncDataBefore.ChannelSetHistory), 0, "document should have channel history") + + // Step 6: Get document sequence for compaction point + docSeq := rt.GetDocumentSequence(nonImportedDocID) + + // Step 7: Update the document body directly in the datastore to simulate external modification + // Read current document body + docBytesRaw, _, err := rt.GetSingleDataStore().GetRaw(ctx, nonImportedDocID) + require.NoError(t, err) + var docBody map[string]any + err = json.Unmarshal(docBytesRaw, &docBody) + require.NoError(t, err) + + // Modify document body with external changes + docBody["modified"] = true + docBody["updatedAt"] = "external_update" + docBody["externalVersion"] = 2 + modifiedDocBytes, err := json.Marshal(docBody) + require.NoError(t, err) + + // Write modified body back to datastore + err = rt.GetSingleDataStore().SetRaw(ctx, nonImportedDocID, 0, nil, modifiedDocBytes) + require.NoError(t, err) + + // Step 8: Call CompactDocChannelHistory - this will trigger the auto-import check + // which verifies the document is imported (has valid _sync xattr) before compacting + err = collection.CompactDocChannelHistory(ctx, nonImportedDocID, docSeq-1) + require.NoError(t, err) + + // Step 9: Verify compaction succeeded and history was removed + syncData, err := collection.GetDocSyncData(ctx, nonImportedDocID) + require.NoError(t, err) + // History should be compacted + assert.Less(t, len(syncData.ChannelSetHistory), len(syncDataBefore.ChannelSetHistory)) + + // Step 10: Verify document is still accessible and intact + docFromBucket, _, err := rt.GetSingleDataStore().GetRaw(ctx, nonImportedDocID) + require.NoError(t, err) + require.NotNil(t, docFromBucket) + + // Verify the document body is intact after compaction + var finalBody map[string]any + err = json.Unmarshal(docFromBucket, &finalBody) + require.NoError(t, err) + assert.Equal(t, "test", finalBody["type"]) +} From 592b361e6b6b11c5da8c3245170f9e443aa35b42 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Wed, 13 May 2026 15:39:51 +0530 Subject: [PATCH 08/11] lint fix --- rest/api_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rest/api_test.go b/rest/api_test.go index afea91c59b..f2b18ad568 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -3946,7 +3946,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { assert.Len(t, syncData.ChannelSetHistory, 0) // Add multiple channels - generates history for the previously removed channel - version = rt.UpdateDoc("doc1", version, `{"channels": ["test", "test2"]}`) + _ = rt.UpdateDoc("doc1", version, `{"channels": ["test", "test2"]}`) syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) @@ -3968,7 +3968,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { // Compact with seq=0 should keep all entries version := rt.PutDoc("doc2", `{"channels": ["test"]}`) version = rt.UpdateDoc("doc2", version, `{"channels": []}`) - version = rt.UpdateDoc("doc2", version, `{"channels": ["test", "test2"]}`) + _ = rt.UpdateDoc("doc2", version, `{"channels": ["test", "test2"]}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc2") require.NoError(t, err) @@ -3987,7 +3987,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { // Compact with very high seq removes all history version := rt.PutDoc("doc3", `{"channels": ["test"]}`) version = rt.UpdateDoc("doc3", version, `{"channels": []}`) - version = rt.UpdateDoc("doc3", version, `{"channels": ["test", "test2"]}`) + _ = rt.UpdateDoc("doc3", version, `{"channels": ["test", "test2"]}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc3") require.NoError(t, err) From faad903a25c070f1d69717ee159a000d364c84bd Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Wed, 13 May 2026 16:02:11 +0530 Subject: [PATCH 09/11] lint fix --- rest/api_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rest/api_test.go b/rest/api_test.go index f2b18ad568..2c64a32e9a 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -4011,7 +4011,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { docSeq := rt.GetDocumentSequence("doc4") version = rt.UpdateDoc("doc4", version, `{"channels": ["b"]}`) - version = rt.UpdateDoc("doc4", version, `{"channels": []}`) + _ = rt.UpdateDoc("doc4", version, `{"channels": []}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc4") require.NoError(t, err) @@ -4057,7 +4057,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { docSeq := rt.GetDocumentSequence("doc7") - version = rt.UpdateDoc("doc7", version, `{"channels": ["a", "c"]}`) + _ = rt.UpdateDoc("doc7", version, `{"channels": ["a", "c"]}`) syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7") require.NoError(t, err) @@ -4091,7 +4091,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { // Verify CAS is updated and no reimport happens version := rt.PutDoc("doc10", `{"channels": ["test"]}`) version = rt.UpdateDoc("doc10", version, `{"channels": []}`) - version = rt.UpdateDoc("doc10", version, `{"channels": ["test", "test2"]}`) + _ = rt.UpdateDoc("doc10", version, `{"channels": ["test", "test2"]}`) // After compaction, document should still be accessible err := collection.CompactDocChannelHistory(ctx, "doc10", 2) @@ -4133,7 +4133,7 @@ func TestCompactNonImportedDocWithAutoImport(t *testing.T) { version = rt.UpdateDoc(nonImportedDocID, version, `{"type":"test","channels":[]}`) // Update again to add new channels (more history) - version = rt.UpdateDoc(nonImportedDocID, version, `{"type":"test","channels":["test_channel","new_channel"]}`) + _ = rt.UpdateDoc(nonImportedDocID, version, `{"type":"test","channels":["test_channel","new_channel"]}`) // Verify document has channel history syncDataBefore, err := collection.GetDocSyncData(ctx, nonImportedDocID) From f7f091c989daa1cbbd146fdbb5287f8ac7e5e5b7 Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Thu, 14 May 2026 22:31:09 +0530 Subject: [PATCH 10/11] return compacted channels --- db/crud.go | 44 ++++++++++++++++++++++++-------------------- rest/api_test.go | 29 +++++++++++++++++++---------- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/db/crud.go b/db/crud.go index cf6d27d8e6..feed330285 100644 --- a/db/crud.go +++ b/db/crud.go @@ -224,21 +224,21 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) ( // CompactDocChannelHistory removes channel history entries that ended at or before the given sequence number. // This is used to truncate stale channel assignment history to reduce storage overhead. -func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) error { +func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid string, seq uint64) ([]string, error) { key := realDocID(docid) if key == "" { - return base.HTTPErrorf(400, "Invalid doc ID") + return nil, base.HTTPErrorf(400, "Invalid doc ID") } xattrKeys := []string{base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName} rawDoc, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, xattrKeys) if err != nil { - return err + return nil, err } doc, err := c.unmarshalDocumentWithXattrs(ctx, key, nil, xattrs, cas, DocUnmarshalSync) if err != nil { - return err + return nil, err } isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawDoc) @@ -251,56 +251,60 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid doc, importErr = c.OnDemandImportForGet(ctx, docid, doc, rawDoc, xattrs, cas) if importErr != nil { - return importErr + return nil, importErr } if doc == nil { - return fmt.Errorf("skipping compaction of document %s, %v ", base.UD(docid), base.ErrNotFound) + return nil, fmt.Errorf("skipping compaction of document %s, %v ", base.UD(docid), base.ErrNotFound) } cas = doc.Cas } - // Store lengths before compaction to detect if any changes occur - historyLenBefore := len(doc.SyncData.ChannelSetHistory) - channelSetLenBefore := len(doc.SyncData.ChannelSet) - channelsLenBefore := len(doc.SyncData.Channels) + compactedChannels := make([]string, 0) doc.SyncData.ChannelSetHistory = slices.DeleteFunc(doc.SyncData.ChannelSetHistory, func(channel ChannelSetEntry) bool { - return channel.End <= seq + del := channel.End <= seq + if del { + compactedChannels = append(compactedChannels, channel.Name) + } + return del }) doc.SyncData.ChannelSet = slices.DeleteFunc(doc.SyncData.ChannelSet, func(channel ChannelSetEntry) bool { - return channel.End != 0 && channel.End <= seq + del := channel.End != 0 && channel.End <= seq + if del { + compactedChannels = append(compactedChannels, channel.Name) + } + return del }) for chanName, chanEntry := range doc.SyncData.Channels { if chanEntry != nil && chanEntry.Seq <= seq { + compactedChannels = append(compactedChannels, chanName) delete(doc.SyncData.Channels, chanName) } } // Exit early if no compaction occurred - if len(doc.SyncData.ChannelSetHistory) == historyLenBefore && - len(doc.SyncData.ChannelSet) == channelSetLenBefore && - len(doc.SyncData.Channels) == channelsLenBefore { - return nil + if len(compactedChannels) == 0 { + return []string{}, nil } rawSyncXattr, err := base.JSONMarshal(doc.SyncData) if err != nil { - return base.RedactErrorf("failed to marshal sync data when trying to compact channel history for doc:%s. Error: %v", base.UD(docid), err) + return nil, base.RedactErrorf("failed to marshal sync data when trying to compact channel history for doc:%s. Error: %v", base.UD(docid), err) } revSeqNo, err := unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo]) if err != nil { base.WarnfCtx(ctx, `Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err) - return base.RedactErrorf(`Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err) + return nil, base.RedactErrorf(`Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err) } metadataOnlyUpdate := computeMetadataOnlyUpdate(doc.Cas, revSeqNo, doc.MetadataOnlyUpdate) rawMouXattr, err := base.JSONMarshal(metadataOnlyUpdate) if err != nil { - return base.RedactErrorf("failed to marshal _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) + return nil, base.RedactErrorf("failed to marshal _mou when attempting to compact channel history for doc: %s. Error: %v", base.UD(docid), err) } // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) @@ -317,7 +321,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid base.MouXattrName: rawMouXattr, } _, err = c.dataStore.UpdateXattrs(ctx, key, 0, cas, updatedXattr, opts) - return err + return compactedChannels, err } // unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map diff --git a/rest/api_test.go b/rest/api_test.go index 2c64a32e9a..a33f29c245 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -3957,8 +3957,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { assert.Equal(t, db.ChannelSetEntry{Name: "test", Start: 1, End: 2}, syncData.ChannelSetHistory[0]) // Compact history at sequence 2 and verify history is cleared - err = collection.CompactDocChannelHistory(ctx, "doc1", 2) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc1", 2) require.NoError(t, err) + assert.Equal(t, []string{"test"}, compactedChannels) syncData, err = collection.GetDocSyncData(ctx, "doc1") assert.NoError(t, err) assert.Zero(t, len(syncData.ChannelSetHistory)) @@ -3975,8 +3976,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { historyLenBefore := len(syncDataBefore.ChannelSetHistory) require.Greater(t, historyLenBefore, 0) - err = collection.CompactDocChannelHistory(ctx, "doc2", 0) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc2", 0) require.NoError(t, err) + assert.Equal(t, []string{}, compactedChannels) syncDataAfter, err := collection.GetDocSyncData(ctx, "doc2") require.NoError(t, err) @@ -3993,8 +3995,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { require.NoError(t, err) // Compact with very high seq number - err = collection.CompactDocChannelHistory(ctx, "doc3", 999999) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc3", 999999) require.NoError(t, err) + assert.Equal(t, []string{"test"}, compactedChannels) syncData, err := collection.GetDocSyncData(ctx, "doc3") require.NoError(t, err) @@ -4018,8 +4021,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { channelSetLenBefore := len(syncDataBefore.ChannelSet) channelLenBefore := len(syncDataBefore.Channels) - err = collection.CompactDocChannelHistory(ctx, "doc4", docSeq) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc4", docSeq) require.NoError(t, err) + assert.Equal(t, []string{"a", "a"}, compactedChannels) syncDataAfter, err := collection.GetDocSyncData(ctx, "doc4") require.NoError(t, err) @@ -4039,14 +4043,14 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { t.Run("invalid doc id", func(t *testing.T) { // Empty doc ID should return 400 error - err := collection.CompactDocChannelHistory(ctx, "", 1) + _, err := collection.CompactDocChannelHistory(ctx, "", 1) assert.Error(t, err) assert.Contains(t, err.Error(), "Invalid doc ID") }) t.Run("nonexistent document", func(t *testing.T) { // Compacting nonexistent doc should return not found error - err := collection.CompactDocChannelHistory(ctx, "nonexistent", 1) + _, err := collection.CompactDocChannelHistory(ctx, "nonexistent", 1) assert.Error(t, err) }) @@ -4061,8 +4065,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7") require.NoError(t, err) - err = collection.CompactDocChannelHistory(ctx, "doc7", docSeq) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc7", docSeq) require.NoError(t, err) + assert.Equal(t, []string{"b", "b"}, compactedChannels) syncData, err := collection.GetDocSyncData(ctx, "doc7") require.NoError(t, err) @@ -4079,8 +4084,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { require.NoError(t, err) assert.Len(t, syncDataBefore.ChannelSetHistory, 0) - err = collection.CompactDocChannelHistory(ctx, "doc8", 1) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc8", 1) require.NoError(t, err) + assert.Equal(t, []string{}, compactedChannels) syncDataAfter, err := collection.GetDocSyncData(ctx, "doc8") require.NoError(t, err) @@ -4092,10 +4098,12 @@ func TestDocumentChannelHistoryCompact(t *testing.T) { version := rt.PutDoc("doc10", `{"channels": ["test"]}`) version = rt.UpdateDoc("doc10", version, `{"channels": []}`) _ = rt.UpdateDoc("doc10", version, `{"channels": ["test", "test2"]}`) + seq := rt.GetDocumentSequence("doc10") // After compaction, document should still be accessible - err := collection.CompactDocChannelHistory(ctx, "doc10", 2) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, "doc10", seq) require.NoError(t, err) + assert.Equal(t, []string{"test"}, compactedChannels) // Verify doc is still accessible with correct data syncData, err := collection.GetDocSyncData(ctx, "doc10") @@ -4164,8 +4172,9 @@ func TestCompactNonImportedDocWithAutoImport(t *testing.T) { // Step 8: Call CompactDocChannelHistory - this will trigger the auto-import check // which verifies the document is imported (has valid _sync xattr) before compacting - err = collection.CompactDocChannelHistory(ctx, nonImportedDocID, docSeq-1) + compactedChannels, err := collection.CompactDocChannelHistory(ctx, nonImportedDocID, docSeq-1) require.NoError(t, err) + assert.Equal(t, []string{"test_channel"}, compactedChannels) // Step 9: Verify compaction succeeded and history was removed syncData, err := collection.GetDocSyncData(ctx, nonImportedDocID) From 69989f255b9b6e6bca61e6eaf66cfd36811e61ea Mon Sep 17 00:00:00 2001 From: Ritesh Kumar Date: Thu, 14 May 2026 22:35:16 +0530 Subject: [PATCH 11/11] fixes based on copilot comments use xattr keys used during resync for compaction change error --- db/crud.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/db/crud.go b/db/crud.go index feed330285..61803c9a47 100644 --- a/db/crud.go +++ b/db/crud.go @@ -230,8 +230,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid return nil, base.HTTPErrorf(400, "Invalid doc ID") } - xattrKeys := []string{base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName} - rawDoc, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, xattrKeys) + rawDoc, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncMouRevSeqNoAndUserXattrKeys()) if err != nil { return nil, err } @@ -254,7 +253,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid return nil, importErr } if doc == nil { - return nil, fmt.Errorf("skipping compaction of document %s, %v ", base.UD(docid), base.ErrNotFound) + return nil, base.ErrNotFound } cas = doc.Cas }