@@ -214,7 +214,10 @@ uint8_t RdbObjectType(const CompactObj& pv) {
214214}
215215
216216RdbSerializerBase::RdbSerializerBase (CompressionMode compression_mode)
217- : compression_mode_(compression_mode), mem_buf_{4_KB}, tmp_buf_(nullptr ) {
217+ : compression_mode_(compression_mode),
218+ default_buf_{4_KB},
219+ mem_buf_{&default_buf_},
220+ tmp_buf_ (nullptr ) {
218221}
219222
220223RdbSerializer::RdbSerializer (CompressionMode compression_mode, ConsumeFun consume_fun,
@@ -277,10 +280,7 @@ io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValu
277280 return 0 ;
278281 }
279282
280- // If mem buf contains data, stash it before we begin this entry.
281- StashCurrentBuffer ();
282-
283- active_entry_.Reset (ActiveEntry::Kind::Baseline);
283+ active_entry_id_ = next_stream_id_++;
284284
285285 DVLOG (3 ) << " Selecting " << dbid << " previous: " << last_entry_db_index_;
286286 auto ec = SelectDb (dbid);
@@ -858,7 +858,6 @@ error_code RdbSerializer::SendEofAndChecksum() {
858858}
859859
860860error_code RdbSerializer::SendJournalOffset (uint64_t journal_offset) {
861- SetRawMode ();
862861 VLOG (2 ) << " SendJournalOffset" ;
863862 RETURN_ON_ERR (WriteOpcode (RDB_OPCODE_JOURNAL_OFFSET ));
864863 uint8_t buf[sizeof (uint64_t )];
@@ -914,18 +913,18 @@ std::error_code RdbSerializerBase::WriteOpcode(uint8_t opcode) {
914913}
915914
916915size_t RdbSerializerBase::GetBufferCapacity () const {
917- return mem_buf_. Capacity ();
916+ return mem_buf_-> Capacity ();
918917}
919918
920919size_t RdbSerializerBase::GetTempBufferSize () const {
921920 return tmp_buf_.size ();
922921}
923922
924923error_code RdbSerializerBase::WriteRaw (const io::Bytes& buf) {
925- mem_buf_. Reserve (mem_buf_. InputLen () + buf.size ());
926- IoBuf::Bytes dest = mem_buf_. AppendBuffer ();
924+ mem_buf_-> Reserve (mem_buf_-> InputLen () + buf.size ());
925+ IoBuf::Bytes dest = mem_buf_-> AppendBuffer ();
927926 memcpy (dest.data (), buf.data (), buf.size ());
928- mem_buf_. CommitWrite (buf.size ());
927+ mem_buf_-> CommitWrite (buf.size ());
929928 return error_code{};
930929}
931930
@@ -942,33 +941,35 @@ string RdbSerializerBase::Flush(FlushState flush_state) {
942941
943942 string result (io::View (bytes));
944943
945- mem_buf_. ConsumeInput (bytes.size ());
944+ mem_buf_-> ConsumeInput (bytes.size ());
946945
947946 return result;
948947}
949948
950949string RdbSerializer::Flush (FlushState flush_state) {
951- string blob = FlushImpl (flush_state);
950+ const auto bytes = PrepareFlush (flush_state);
951+ auto result = PrefixDefaultBufferAndTag (bytes);
952+ mem_buf_->ConsumeInput (bytes.size ());
953+ if (result.empty ())
954+ return {};
955+
952956 if (send_tagged_entries_) {
953- if (auto res = CompressBlob (blob ); res)
954- blob = std::move (*res);
957+ if (auto res = CompressBlob (result ); res)
958+ result = std::move (*res);
955959 }
956- return blob;
957- }
958960
959- string RdbSerializer::FlushImpl (FlushState flush_state) {
960- DrainMemBufIntoPendingRecords (flush_state);
961+ if (result.size () > serialization_peak_bytes_) {
962+ serialization_peak_bytes_ = result.size ();
963+ }
961964
962- std::string out = absl::StrJoin (pending_records_, " " );
963- pending_records_.clear ();
964- pending_record_bytes_ = 0 ;
965+ DVLOG (2 ) << " FlushToSink " << result.size () << " bytes" ;
965966
966967 // After every flush we should write the DB index again because the blobs in the channel are
967968 // interleaved and multiple savers can correspond to a single writer (in case of single file rdb
968969 // snapshot)
969970 last_entry_db_index_ = kInvalidDbId ;
970971
971- return out ;
972+ return result ;
972973}
973974
974975namespace {
@@ -1038,11 +1039,14 @@ string RdbSerializerBase::DumpValue(const PrimeValue& obj, bool ignore_crc) {
10381039}
10391040
10401041size_t RdbSerializerBase::SerializedLen () const {
1041- return mem_buf_.InputLen ();
1042+ auto len = mem_buf_->InputLen ();
1043+ if (mem_buf_ != &default_buf_)
1044+ len += default_buf_.InputLen ();
1045+ return len;
10421046}
10431047
10441048io::Bytes RdbSerializerBase::PrepareFlush (FlushState flush_state) {
1045- size_t sz = mem_buf_. InputLen ();
1049+ size_t sz = mem_buf_-> InputLen ();
10461050 if (sz == 0 )
10471051 return {};
10481052
@@ -1061,7 +1065,7 @@ io::Bytes RdbSerializerBase::PrepareFlush(FlushState flush_state) {
10611065 number_of_chunks_ = is_last_chunk ? 0 : number_of_chunks_ + 1 ;
10621066 }
10631067
1064- return mem_buf_. InputBuffer ();
1068+ return mem_buf_-> InputBuffer ();
10651069}
10661070
10671071error_code RdbSerializerBase::WriteJournalEntry (std::string_view serialized_entry) {
@@ -1889,59 +1893,105 @@ std::optional<std::string> RdbSerializerBase::CompressBlob(std::string_view inpu
18891893}
18901894
18911895void RdbSerializerBase::CompressBlob () {
1892- Bytes blob_to_compress = mem_buf_. InputBuffer ();
1896+ Bytes blob_to_compress = mem_buf_-> InputBuffer ();
18931897 std::string_view input{reinterpret_cast <const char *>(blob_to_compress.data ()),
18941898 blob_to_compress.size ()};
18951899 auto compressed = CompressBlob (input);
18961900 if (!compressed)
18971901 return ;
18981902
1899- mem_buf_. ConsumeInput (blob_to_compress.size ());
1900- mem_buf_. Reserve (compressed->size ());
1901- auto destination = mem_buf_. AppendBuffer ();
1903+ mem_buf_-> ConsumeInput (blob_to_compress.size ());
1904+ mem_buf_-> Reserve (compressed->size ());
1905+ auto destination = mem_buf_-> AppendBuffer ();
19021906 memcpy (destination.data (), compressed->data (), compressed->size ());
1903- mem_buf_. CommitWrite (compressed->size ());
1907+ mem_buf_-> CommitWrite (compressed->size ());
19041908}
19051909
19061910size_t RdbSerializer::GetTempBufferSize () const {
19071911 return RdbSerializerBase::GetTempBufferSize () + tmp_str_.size ();
19081912}
19091913
1910- size_t RdbSerializer::SerializedLen () const {
1911- return RdbSerializerBase::SerializedLen () + pending_record_bytes_;
1912- }
1913-
19141914std::error_code RdbSerializer::SendFullSyncCut () {
1915- SetRawMode ();
19161915 return RdbSerializerBase::SendFullSyncCut ();
19171916}
19181917
19191918error_code RdbSerializer::WriteJournalEntry (string_view serialized_entry) {
1920- SetRawMode ();
19211919 return RdbSerializerBase::WriteJournalEntry (serialized_entry);
19221920}
19231921
1922+ std::string RdbSerializer::PrefixDefaultBufferAndTag (Bytes bytes) {
1923+ const bool should_tag = send_tagged_entries_ && current_continuation_id_ != 0 ;
1924+
1925+ const auto current_bytes = io::View (bytes);
1926+ if (mem_buf_ == &default_buf_ || default_buf_.InputLen () == 0 ) {
1927+ if (!should_tag)
1928+ return std::string{current_bytes};
1929+
1930+ auto header = MakeTagHeader (current_bytes.size ());
1931+ std::string out;
1932+ out.reserve (header.size () + current_bytes.size ());
1933+ out.append (std::string_view (reinterpret_cast <const char *>(header.data ()), header.size ()));
1934+ out.append (current_bytes);
1935+ return out;
1936+ }
1937+
1938+ std::string out;
1939+ const auto prefix = default_buf_.InputBuffer ();
1940+
1941+ out.reserve (prefix.size () + 9 + current_bytes.size ());
1942+ out.append (io::View (prefix));
1943+ default_buf_.ConsumeInput (prefix.size ());
1944+
1945+ if (should_tag)
1946+ out.append (io::View (MakeTagHeader (current_bytes.size ())));
1947+ out.append (current_bytes);
1948+ return out;
1949+ }
1950+
19241951void RdbSerializer::PushToConsumerIfNeeded (FlushState flush_state) {
1925- if (consume_fun_ && SerializedLen () > flush_threshold_) {
1952+ if (!consume_fun_)
1953+ return ;
1954+
1955+ const auto id = active_entry_id_;
1956+ const bool is_mid = flush_state == FlushState::kFlushMidEntry ;
1957+ const bool is_end = flush_state == FlushState::kFlushEndEntry ;
1958+ const bool has_continuation = continuations_.contains (id);
1959+ // Force tail entries which were split earlier to be pushed even if we do not have enough data, to
1960+ // avoid those entries lying around in continuations_
1961+ const bool must_push_tail = send_tagged_entries_ && is_end && has_continuation;
1962+
1963+ if (SerializedLen () <= flush_threshold_ && !must_push_tail)
1964+ return ;
1965+
1966+ if (!send_tagged_entries_) {
19261967 string blob = Flush (flush_state);
19271968 DCHECK (!blob.empty ()); // SerializedLen() > 0.
1928- const auto saved_entry = active_entry_;
19291969 consume_fun_ (std::move (blob));
1930- StashCurrentBuffer ();
1931- // Restore saved entry if we yielded during consume_fun_, restores the tagging information
1932- // (ActiveEntry::chunked, ActiveEntry::stream_id)
1933- active_entry_ = saved_entry;
1970+ return ;
19341971 }
1935- }
19361972
1937- void RdbSerializer::StashCurrentBuffer () {
1938- if (!send_tagged_entries_)
1939- return ;
1973+ if (is_mid) {
1974+ continuations_.try_emplace (id, std::make_unique<IoBuf>());
1975+ current_continuation_id_ = id;
1976+ }
19401977
1941- if (mem_buf_.InputLen () == 0 )
1942- return ;
1978+ string blob = Flush (flush_state);
1979+ DCHECK (!blob.empty ()); // SerializedLen() > 0.
1980+
1981+ mem_buf_ = &default_buf_;
1982+ current_continuation_id_ = 0 ;
1983+ consume_fun_ (std::move (blob));
19431984
1944- DrainMemBufIntoPendingRecords (FlushState::kFlushEndEntry );
1985+ active_entry_id_ = id;
1986+
1987+ if (is_mid) {
1988+ mem_buf_ = continuations_[id].get ();
1989+ current_continuation_id_ = id;
1990+ } else {
1991+ continuations_.erase (id);
1992+ mem_buf_ = &default_buf_;
1993+ current_continuation_id_ = 0 ;
1994+ }
19451995}
19461996
19471997std::string RdbSerializer::TagChunk (std::string blob, uint32_t stream_id) {
@@ -1959,57 +2009,12 @@ std::string RdbSerializer::TagChunk(std::string blob, uint32_t stream_id) {
19592009 return result;
19602010}
19612011
1962- void RdbSerializer::SetRawMode () {
1963- if (active_entry_.kind != ActiveEntry::Kind::Raw)
1964- StashCurrentBuffer ();
1965- active_entry_.Reset (ActiveEntry::Kind::Raw);
1966- }
1967-
1968- uint32_t RdbSerializer::AllocateStreamId () {
1969- return next_stream_id_++;
1970- }
1971-
1972- std::string RdbSerializer::FinalizeCurrentRecord (FlushState flush_state) {
1973- if (mem_buf_.InputLen () == 0 ) {
1974- return {};
1975- }
1976-
1977- auto blob = RdbSerializerBase::Flush (flush_state);
1978- if (!send_tagged_entries_) {
1979- if (flush_state == FlushState::kFlushEndEntry )
1980- active_entry_.Reset (ActiveEntry::Kind::Raw);
1981- // not tagged entry always returns blob as it is
1982- return blob;
1983- }
1984-
1985- if (active_entry_.kind == ActiveEntry::Kind::Raw)
1986- return blob;
1987-
1988- if (flush_state == FlushState::kFlushMidEntry ) {
1989- // Mark the current entry as chunked. Any future chunks going out for this entry will be tagged
1990- // too
1991- if (!active_entry_.chunked ) {
1992- active_entry_.chunked = true ;
1993- active_entry_.stream_id = AllocateStreamId ();
1994- }
1995- return TagChunk (blob, *active_entry_.stream_id );
1996- }
1997-
1998- // end of entry
1999- if (active_entry_.chunked ) {
2000- blob = TagChunk (blob, *active_entry_.stream_id );
2001- }
2002-
2003- // Reset active entry, current baseline entry is finished
2004- active_entry_.Reset (ActiveEntry::Kind::Raw);
2005- return blob;
2006- }
2007-
2008- void RdbSerializer::DrainMemBufIntoPendingRecords (FlushState flush_state) {
2009- if (auto record = FinalizeCurrentRecord (flush_state); !record.empty ()) {
2010- pending_record_bytes_ += record.size ();
2011- pending_records_.push_back (std::move (record));
2012- }
2012+ std::array<uint8_t , 9 > RdbSerializer::MakeTagHeader (size_t size) const {
2013+ std::array<uint8_t , 9 > header;
2014+ header[0 ] = RDB_OPCODE_TAGGED_CHUNK ;
2015+ absl::little_endian::Store32 (header.data () + 1 , current_continuation_id_);
2016+ absl::little_endian::Store32 (header.data () + 5 , size);
2017+ return header;
20132018}
20142019
20152020} // namespace dfly
0 commit comments