Skip to content

Commit 8e56474

Browse files
committed
fixes based on pr comments
1 parent 40b882b commit 8e56474

2 files changed

Lines changed: 40 additions & 104 deletions

File tree

db/crud.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid
226226
}
227227

228228
xattrKeys := []string{base.SyncXattrName, base.VirtualXattrRevSeqNo, base.MouXattrName}
229-
_, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, xattrKeys)
229+
rawDoc, xattrs, cas, err := c.dataStore.GetWithXattrs(ctx, key, xattrKeys)
230230
if err != nil {
231231
return err
232232
}
@@ -236,6 +236,20 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid
236236
return err
237237
}
238238

239+
isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawDoc)
240+
if crc32Match {
241+
c.dbStats().Database().Crc32MatchCount.Add(1)
242+
}
243+
244+
if !isSgWrite {
245+
var importErr error
246+
247+
doc, importErr = c.OnDemandImportForGet(ctx, docid, doc, rawDoc, xattrs, cas)
248+
if importErr != nil {
249+
return importErr
250+
}
251+
}
252+
239253
// Store lengths before compaction to detect if any changes occur
240254
historyLenBefore := len(doc.SyncData.ChannelSetHistory)
241255
channelSetLenBefore := len(doc.SyncData.ChannelSet)
@@ -269,7 +283,8 @@ func (c *DatabaseCollection) CompactDocChannelHistory(ctx context.Context, docid
269283

270284
revSeqNo, err := unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo])
271285
if err != nil {
272-
base.InfofCtx(ctx, base.KeyCRUD, `Could not determine the revSeqNo when attempting to compact channel history for doc: %s. Error: %v`, base.UD(docid), err)
286+
base.WarnfCtx(ctx, `Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err)
287+
return base.RedactErrorf(`Could not determine the revSeqNo when attempting to compact channel history for doc %s - history will not be compacted: %v`, base.UD(docid), err)
273288
}
274289

275290
metadataOnlyUpdate := computeMetadataOnlyUpdate(doc.Cas, revSeqNo, doc.MetadataOnlyUpdate)

rest/api_test.go

Lines changed: 23 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -3919,12 +3919,8 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
39193919
collection, ctx := rt.GetSingleTestDatabaseCollection()
39203920

39213921
t.Run("basic compaction", func(t *testing.T) {
3922-
var body db.Body
39233922
// Create a document with a single channel assignment
3924-
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"channels": ["test"]}`)
3925-
RequireStatus(t, resp, http.StatusCreated)
3926-
err := json.Unmarshal(resp.BodyBytes(), &body)
3927-
require.NoError(t, err)
3923+
version := rt.PutDoc("doc1", `{"channels": ["test"]}`)
39283924
syncData, err := collection.GetDocSyncData(ctx, "doc1")
39293925
assert.NoError(t, err)
39303926

@@ -3933,10 +3929,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
39333929
assert.Len(t, syncData.ChannelSetHistory, 0)
39343930

39353931
// Remove all channels - ends the existing channel range in ChannelSet
3936-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc1?rev="+body["rev"].(string), `{"channels": []}`)
3937-
RequireStatus(t, resp, http.StatusCreated)
3938-
err = json.Unmarshal(resp.BodyBytes(), &body)
3939-
require.NoError(t, err)
3932+
version = rt.UpdateDoc("doc1", version, `{"channels": []}`)
39403933
syncData, err = collection.GetDocSyncData(ctx, "doc1")
39413934
assert.NoError(t, err)
39423935

@@ -3945,10 +3938,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
39453938
assert.Len(t, syncData.ChannelSetHistory, 0)
39463939

39473940
// Add multiple channels - generates history for the previously removed channel
3948-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc1?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`)
3949-
RequireStatus(t, resp, http.StatusCreated)
3950-
err = json.Unmarshal(resp.BodyBytes(), &body)
3951-
require.NoError(t, err)
3941+
version = rt.UpdateDoc("doc1", version, `{"channels": ["test", "test2"]}`)
39523942
syncData, err = collection.GetDocSyncData(ctx, "doc1")
39533943
assert.NoError(t, err)
39543944

@@ -3968,21 +3958,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
39683958

39693959
t.Run("seq zero keeps all history", func(t *testing.T) {
39703960
// Compact with seq=0 should keep all entries
3971-
var body db.Body
3972-
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc2", `{"channels": ["test"]}`)
3973-
RequireStatus(t, resp, http.StatusCreated)
3974-
err := json.Unmarshal(resp.BodyBytes(), &body)
3975-
require.NoError(t, err)
3976-
3977-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc2?rev="+body["rev"].(string), `{"channels": []}`)
3978-
RequireStatus(t, resp, http.StatusCreated)
3979-
err = json.Unmarshal(resp.BodyBytes(), &body)
3980-
require.NoError(t, err)
3981-
3982-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc2?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`)
3983-
RequireStatus(t, resp, http.StatusCreated)
3984-
err = json.Unmarshal(resp.BodyBytes(), &body)
3985-
require.NoError(t, err)
3961+
version := rt.PutDoc("doc2", `{"channels": ["test"]}`)
3962+
version = rt.UpdateDoc("doc2", version, `{"channels": []}`)
3963+
version = rt.UpdateDoc("doc2", version, `{"channels": ["test", "test2"]}`)
39863964

39873965
syncDataBefore, err := collection.GetDocSyncData(ctx, "doc2")
39883966
require.NoError(t, err)
@@ -3999,21 +3977,9 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
39993977

40003978
t.Run("compact all history", func(t *testing.T) {
40013979
// Compact with very high seq removes all history
4002-
var body db.Body
4003-
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc3", `{"channels": ["test"]}`)
4004-
RequireStatus(t, resp, http.StatusCreated)
4005-
err := json.Unmarshal(resp.BodyBytes(), &body)
4006-
require.NoError(t, err)
4007-
4008-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc3?rev="+body["rev"].(string), `{"channels": []}`)
4009-
RequireStatus(t, resp, http.StatusCreated)
4010-
err = json.Unmarshal(resp.BodyBytes(), &body)
4011-
require.NoError(t, err)
4012-
4013-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc3?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`)
4014-
RequireStatus(t, resp, http.StatusCreated)
4015-
err = json.Unmarshal(resp.BodyBytes(), &body)
4016-
require.NoError(t, err)
3980+
version := rt.PutDoc("doc3", `{"channels": ["test"]}`)
3981+
version = rt.UpdateDoc("doc3", version, `{"channels": []}`)
3982+
version = rt.UpdateDoc("doc3", version, `{"channels": ["test", "test2"]}`)
40173983

40183984
syncDataBefore, err := collection.GetDocSyncData(ctx, "doc3")
40193985
require.NoError(t, err)
@@ -4031,49 +3997,31 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
40313997

40323998
t.Run("partial history compaction", func(t *testing.T) {
40333999
// Compact removes entries with End <= seq, keeps entries with End > seq
4034-
var body db.Body
4035-
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc4", `{"channels": ["a"]}`)
4036-
RequireStatus(t, resp, http.StatusCreated)
4037-
err := json.Unmarshal(resp.BodyBytes(), &body)
4038-
require.NoError(t, err)
4039-
4040-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": []}`)
4041-
RequireStatus(t, resp, http.StatusCreated)
4042-
err = json.Unmarshal(resp.BodyBytes(), &body)
4043-
require.NoError(t, err)
4000+
version := rt.PutDoc("doc4", `{"channels": ["a"]}`)
4001+
version = rt.UpdateDoc("doc4", version, `{"channels": []}`)
40444002

40454003
docSeq := rt.GetDocumentSequence("doc4")
40464004

4047-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": ["b"]}`)
4048-
RequireStatus(t, resp, http.StatusCreated)
4049-
err = json.Unmarshal(resp.BodyBytes(), &body)
4050-
require.NoError(t, err)
4051-
4052-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc4?rev="+body["rev"].(string), `{"channels": []}`)
4053-
RequireStatus(t, resp, http.StatusCreated)
4054-
err = json.Unmarshal(resp.BodyBytes(), &body)
4055-
require.NoError(t, err)
4005+
version = rt.UpdateDoc("doc4", version, `{"channels": ["b"]}`)
4006+
version = rt.UpdateDoc("doc4", version, `{"channels": []}`)
40564007

40574008
syncDataBefore, err := collection.GetDocSyncData(ctx, "doc4")
40584009
require.NoError(t, err)
40594010
channelSetLenBefore := len(syncDataBefore.ChannelSet)
40604011
channelLenBefore := len(syncDataBefore.Channels)
40614012

4062-
// Compact at seq 3
40634013
err = collection.CompactDocChannelHistory(ctx, "doc4", docSeq)
40644014
require.NoError(t, err)
40654015

40664016
syncDataAfter, err := collection.GetDocSyncData(ctx, "doc4")
40674017
require.NoError(t, err)
40684018

4069-
// Verify: only entries with End > 3 or End == 0 (still active) should remain
40704019
for _, entry := range syncDataAfter.ChannelSet {
4071-
assert.True(t, entry.End == 0 || entry.End > uint64(3))
4020+
assert.True(t, entry.End == 0 || entry.End > docSeq)
40724021
}
40734022

4074-
// Verify only channels with seq > 3 should remain
40754023
for _, entry := range syncDataAfter.Channels {
4076-
assert.True(t, entry.Seq > uint64(3))
4024+
assert.True(t, entry.Seq > docSeq)
40774025
}
40784026

40794027
// Should have fewer than before
@@ -4096,23 +4044,12 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
40964044

40974045
t.Run("multiple channels with mixed history", func(t *testing.T) {
40984046
// Complex scenario with multiple channels
4099-
var body db.Body
4100-
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc7", `{"channels": ["a", "b"]}`)
4101-
RequireStatus(t, resp, http.StatusCreated)
4102-
err := json.Unmarshal(resp.BodyBytes(), &body)
4103-
require.NoError(t, err)
4104-
4105-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc7?rev="+body["rev"].(string), `{"channels": ["a"]}`)
4106-
RequireStatus(t, resp, http.StatusCreated)
4107-
err = json.Unmarshal(resp.BodyBytes(), &body)
4108-
require.NoError(t, err)
4047+
version := rt.PutDoc("doc7", `{"channels": ["a", "b"]}`)
4048+
version = rt.UpdateDoc("doc7", version, `{"channels": ["a"]}`)
41094049

41104050
docSeq := rt.GetDocumentSequence("doc7")
41114051

4112-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc7?rev="+body["rev"].(string), `{"channels": ["a", "c"]}`)
4113-
RequireStatus(t, resp, http.StatusCreated)
4114-
err = json.Unmarshal(resp.BodyBytes(), &body)
4115-
require.NoError(t, err)
4052+
version = rt.UpdateDoc("doc7", version, `{"channels": ["a", "c"]}`)
41164053

41174054
syncDataBefore, err := collection.GetDocSyncData(ctx, "doc7")
41184055
require.NoError(t, err)
@@ -4128,11 +4065,7 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
41284065

41294066
t.Run("compact empty history", func(t *testing.T) {
41304067
// Doc with no history should succeed
4131-
var body db.Body
4132-
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc8", `{"channels": ["test"]}`)
4133-
RequireStatus(t, resp, http.StatusCreated)
4134-
err := json.Unmarshal(resp.BodyBytes(), &body)
4135-
require.NoError(t, err)
4068+
rt.PutDoc("doc8", `{"channels": ["test"]}`)
41364069

41374070
syncDataBefore, err := collection.GetDocSyncData(ctx, "doc8")
41384071
require.NoError(t, err)
@@ -4148,24 +4081,12 @@ func TestDocumentChannelHistoryCompact(t *testing.T) {
41484081

41494082
t.Run("verify xattr updates", func(t *testing.T) {
41504083
// Verify CAS is updated and no reimport happens
4151-
var body db.Body
4152-
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc10", `{"channels": ["test"]}`)
4153-
RequireStatus(t, resp, http.StatusCreated)
4154-
err := json.Unmarshal(resp.BodyBytes(), &body)
4155-
require.NoError(t, err)
4156-
4157-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc10?rev="+body["rev"].(string), `{"channels": []}`)
4158-
RequireStatus(t, resp, http.StatusCreated)
4159-
err = json.Unmarshal(resp.BodyBytes(), &body)
4160-
require.NoError(t, err)
4161-
4162-
resp = rt.SendAdminRequest("PUT", "/{{.keyspace}}/doc10?rev="+body["rev"].(string), `{"channels": ["test", "test2"]}`)
4163-
RequireStatus(t, resp, http.StatusCreated)
4164-
err = json.Unmarshal(resp.BodyBytes(), &body)
4165-
require.NoError(t, err)
4084+
version := rt.PutDoc("doc10", `{"channels": ["test"]}`)
4085+
version = rt.UpdateDoc("doc10", version, `{"channels": []}`)
4086+
version = rt.UpdateDoc("doc10", version, `{"channels": ["test", "test2"]}`)
41664087

41674088
// After compaction, document should still be accessible
4168-
err = collection.CompactDocChannelHistory(ctx, "doc10", 2)
4089+
err := collection.CompactDocChannelHistory(ctx, "doc10", 2)
41694090
require.NoError(t, err)
41704091

41714092
// Verify doc is still accessible with correct data

0 commit comments

Comments
 (0)