Skip to content

Commit f601b56

Browse files
razdoburdinDmitry RazdoburdinCopilotrfsalievethanglaser
authored
Native serialization for Indexes. (#300)
This PR adds native stream serialization to all SVS index types, as an alternative to the existing (legacy) directory-based serialization. It allow to avoid filesystem round-trips of the data. The native serialization doesn't require from the stream to be seek able, so no additional restrictions were introduced. See the following PR for details: #280, #281, #285, #286, #289, #292, #294, #296, #299 Main changes are: 1. A CRTP base `Archiver` extracts binary I/O primitives (`write_size`, `read_size`, `write_name`, `read_name`, `read_from_istream`) from `DirectoryArchiver`. `DirectoryArchiver` and new `StreamArchiver` class inherit from `Archiver`. `StreamArchiver` has its own magic number ("SVS_STRM") to distinguish native streams from directory archives. 2. The monolithic `Writer` is split via CRTP with two derived classes: `FileWriter` owns an `std::ofstream`, writes a header, flushes on destructor, `StreamWriter` wraps an external `std::ostream&`, no header/lifecycle management. This allows `io::save(data, os)` to write vector data directly to any stream. 3. The `save(stream)` in orchestrator `Impl` classes no longer does temp-dir->pack. Instead it directly calls `impl().save(stream)`. 4. The dispatching between new (native) and old (legacy) deserialization is made at the orchestrators. `Deserializer::build(is)` reads the magic number, exposes `is_native()` to choose path. --------- Co-authored-by: Dmitry Razdoburdin <drazdobu@intel.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Rafik Saliev <rafik.f.saliev@intel.com> Co-authored-by: ethanglaser <42726565+ethanglaser@users.noreply.github.com>
1 parent 12e9571 commit f601b56

40 files changed

Lines changed: 2467 additions & 566 deletions

bindings/cpp/src/dynamic_vamana_index_impl.h

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -316,15 +316,7 @@ class DynamicVamanaIndexImpl {
316316
ErrorCode::NOT_INITIALIZED, "Cannot serialize: SVS index not initialized."};
317317
}
318318

319-
lib::UniqueTempDirectory tempdir{"svs_vamana_save"};
320-
const auto config_dir = tempdir.get() / "config";
321-
const auto graph_dir = tempdir.get() / "graph";
322-
const auto data_dir = tempdir.get() / "data";
323-
std::filesystem::create_directories(config_dir);
324-
std::filesystem::create_directories(graph_dir);
325-
std::filesystem::create_directories(data_dir);
326-
impl_->save(config_dir, graph_dir, data_dir);
327-
lib::DirectoryArchiver::pack(tempdir, out);
319+
impl_->save(out);
328320
}
329321

330322
protected:
@@ -469,49 +461,32 @@ class DynamicVamanaIndexImpl {
469461
impl_->get_full_search_history()};
470462
}
471463

472-
template <typename Tag>
473-
static svs::DynamicVamana*
474-
load_impl_t(Tag&& tag, std::istream& stream, MetricType metric) {
475-
namespace fs = std::filesystem;
476-
lib::UniqueTempDirectory tempdir{"svs_vamana_load"};
477-
lib::DirectoryArchiver::unpack(stream, tempdir);
478-
479-
const auto config_path = tempdir.get() / "config";
480-
if (!fs::is_directory(config_path)) {
481-
throw StatusException{
482-
ErrorCode::RUNTIME_ERROR,
483-
"Invalid Vamana index archive: missing config directory!"};
484-
}
485-
486-
const auto graph_path = tempdir.get() / "graph";
487-
if (!fs::is_directory(graph_path)) {
488-
throw StatusException{
489-
ErrorCode::RUNTIME_ERROR,
490-
"Invalid Vamana index archive: missing graph directory!"};
491-
}
492-
493-
const auto data_path = tempdir.get() / "data";
494-
if (!fs::is_directory(data_path)) {
495-
throw StatusException{
496-
ErrorCode::RUNTIME_ERROR,
497-
"Invalid Vamana index archive: missing data directory!"};
464+
template <StorageKind Kind, typename Alloc>
465+
static svs::DynamicVamana* load_impl_t(
466+
storage::StorageType<Kind, Alloc>&& SVS_UNUSED(tag),
467+
std::istream& stream,
468+
MetricType metric
469+
) {
470+
if constexpr (!storage::is_supported_storage_kind_v<Kind>) {
471+
throw StatusException(
472+
ErrorCode::NOT_IMPLEMENTED, "Requested storage kind is not supported"
473+
);
474+
} else {
475+
using storage_type = storage::StorageType_t<Kind, Alloc>;
476+
auto threadpool = default_threadpool();
477+
478+
svs::DistanceDispatcher distance_dispatcher(to_svs_distance(metric));
479+
480+
return distance_dispatcher([&](auto&& distance) {
481+
return new svs::DynamicVamana(
482+
svs::DynamicVamana::assemble<float, storage_type>(
483+
stream,
484+
std::forward<decltype(distance)>(distance),
485+
std::move(threadpool)
486+
)
487+
);
488+
});
498489
}
499-
500-
auto storage = storage::load_storage(std::forward<Tag>(tag), data_path);
501-
auto threadpool = default_threadpool();
502-
503-
svs::DistanceDispatcher distance_dispatcher(to_svs_distance(metric));
504-
505-
return distance_dispatcher([&](auto&& distance) {
506-
return new svs::DynamicVamana(svs::DynamicVamana::assemble<float>(
507-
config_path,
508-
svs::GraphLoader{graph_path},
509-
std::move(storage),
510-
std::forward<decltype(distance)>(distance),
511-
std::move(threadpool),
512-
false
513-
));
514-
});
515490
}
516491

517492
public:

bindings/cpp/src/svs_runtime_utils.h

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ template <typename Alloc> struct StorageType<StorageKind::SQI8, Alloc> {
203203
using type = SQDatasetType<std::int8_t, allocator_type>;
204204
};
205205

206+
template <StorageKind Kind>
207+
inline constexpr bool is_supported_storage_kind_v = !std::is_same_v<
208+
typename StorageType<Kind, svs::lib::Allocator<float>>::type,
209+
UnsupportedStorageType<svs::lib::Allocator<float>>>;
210+
206211
// Storage factory
207212
template <typename T> struct StorageFactory;
208213

@@ -222,14 +227,6 @@ template <typename Alloc> struct StorageFactory<UnsupportedStorageType<Alloc>> {
222227
ErrorCode::NOT_IMPLEMENTED, "Requested storage kind is not supported"
223228
);
224229
}
225-
226-
template <typename... Args>
227-
static StorageType
228-
load(const std::filesystem::path& SVS_UNUSED(path), Args&&... SVS_UNUSED(args)) {
229-
throw StatusException(
230-
ErrorCode::NOT_IMPLEMENTED, "Requested storage kind is not supported"
231-
);
232-
}
233230
};
234231

235232
template <typename T, size_t Extent, typename Alloc>
@@ -254,11 +251,6 @@ struct StorageFactory<svs::data::SimpleData<T, Extent, Alloc>> {
254251
);
255252
return result;
256253
}
257-
258-
template <typename... Args>
259-
static StorageType load(const std::filesystem::path& path, Args&&... args) {
260-
return svs::lib::load_from_disk<StorageType>(path, SVS_FWD(args)...);
261-
}
262254
};
263255

264256
// SQ Storage support
@@ -274,11 +266,6 @@ struct StorageFactory<svs::quantization::scalar::SQDataset<T, Extent, Alloc>> {
274266
) {
275267
return StorageType::compress(data, pool, alloc);
276268
}
277-
278-
template <typename... Args>
279-
static StorageType load(const std::filesystem::path& path, Args&&... args) {
280-
return svs::lib::load_from_disk<StorageType>(path, SVS_FWD(args)...);
281-
}
282269
};
283270

284271
// LVQ Storage support
@@ -327,11 +314,6 @@ struct StorageFactory<LVQStorageType> {
327314
) {
328315
return StorageType::compress(data, pool, 0, alloc);
329316
}
330-
331-
template <typename... Args>
332-
static StorageType load(const std::filesystem::path& path, Args&&... args) {
333-
return svs::lib::load_from_disk<StorageType>(path, SVS_FWD(args)...);
334-
}
335317
};
336318

337319
// LeanVec Storage support
@@ -381,11 +363,6 @@ struct StorageFactory<LeanVecStorageType> {
381363
data, std::move(matrices), pool, 0, svs::lib::MaybeStatic{leanvec_d}, alloc
382364
);
383365
}
384-
385-
template <typename... Args>
386-
static StorageType load(const std::filesystem::path& path, Args&&... args) {
387-
return svs::lib::load_from_disk<StorageType>(path, SVS_FWD(args)...);
388-
}
389366
};
390367
#endif // SVS_RUNTIME_HAVE_LVQ_LEANVEC
391368

@@ -394,11 +371,6 @@ auto make_storage(StorageType<Kind, Alloc> SVS_UNUSED(tag), Args&&... args) {
394371
return StorageFactory<StorageType_t<Kind, Alloc>>::init(std::forward<Args>(args)...);
395372
}
396373

397-
template <StorageKind Kind, typename Alloc, typename... Args>
398-
auto load_storage(StorageType<Kind, Alloc> SVS_UNUSED(tag), Args&&... args) {
399-
return StorageFactory<StorageType_t<Kind, Alloc>>::load(std::forward<Args>(args)...);
400-
}
401-
402374
template <typename Alloc, typename F, typename... Args>
403375
auto dispatch_storage_kind(StorageKind kind, F&& f, Args&&... args) {
404376
if (!is_supported_storage_kind(kind)) {

bindings/cpp/src/vamana_index_impl.h

Lines changed: 17 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -271,17 +271,7 @@ class VamanaIndexImpl {
271271

272272
void reset() { impl_.reset(); }
273273

274-
void save(std::ostream& out) const {
275-
lib::UniqueTempDirectory tempdir{"svs_vamana_save"};
276-
const auto config_dir = tempdir.get() / "config";
277-
const auto graph_dir = tempdir.get() / "graph";
278-
const auto data_dir = tempdir.get() / "data";
279-
std::filesystem::create_directories(config_dir);
280-
std::filesystem::create_directories(graph_dir);
281-
std::filesystem::create_directories(data_dir);
282-
get_impl()->save(config_dir, graph_dir, data_dir);
283-
lib::DirectoryArchiver::pack(tempdir, out);
284-
}
274+
void save(std::ostream& out) const { get_impl()->save(out); }
285275

286276
protected:
287277
// Utility functions
@@ -420,47 +410,24 @@ class VamanaIndexImpl {
420410
}
421411
}
422412

423-
template <typename Tag>
424-
static svs::Vamana* load_impl_t(Tag&& tag, std::istream& stream, MetricType metric) {
425-
namespace fs = std::filesystem;
426-
lib::UniqueTempDirectory tempdir{"svs_vamana_load"};
427-
lib::DirectoryArchiver::unpack(stream, tempdir);
428-
429-
const auto config_path = tempdir.get() / "config";
430-
if (!fs::is_directory(config_path)) {
431-
throw StatusException{
432-
ErrorCode::RUNTIME_ERROR,
433-
"Invalid Vamana index archive: missing config directory!"};
434-
}
435-
436-
const auto graph_path = tempdir.get() / "graph";
437-
if (!fs::is_directory(graph_path)) {
438-
throw StatusException{
439-
ErrorCode::RUNTIME_ERROR,
440-
"Invalid Vamana index archive: missing graph directory!"};
441-
}
442-
443-
const auto data_path = tempdir.get() / "data";
444-
if (!fs::is_directory(data_path)) {
445-
throw StatusException{
446-
ErrorCode::RUNTIME_ERROR,
447-
"Invalid Vamana index archive: missing data directory!"};
448-
}
449-
450-
auto storage = storage::load_storage(std::forward<Tag>(tag), data_path);
451-
auto threadpool = default_threadpool();
452-
453-
svs::DistanceDispatcher distance_dispatcher(to_svs_distance(metric));
413+
template <StorageKind Kind, typename Alloc>
414+
static svs::Vamana* load_impl_t(
415+
storage::StorageType<Kind, Alloc>&& SVS_UNUSED(tag),
416+
std::istream& stream,
417+
MetricType metric
418+
) {
419+
if constexpr (!storage::is_supported_storage_kind_v<Kind>) {
420+
throw StatusException(
421+
ErrorCode::NOT_IMPLEMENTED, "Requested storage kind is not supported"
422+
);
423+
} else {
424+
using storage_type = storage::StorageType_t<Kind, Alloc>;
425+
auto threadpool = default_threadpool();
454426

455-
return distance_dispatcher([&](auto&& distance) {
456-
return new svs::Vamana(svs::Vamana::assemble<float>(
457-
config_path,
458-
svs::GraphLoader{graph_path},
459-
std::move(storage),
460-
std::forward<decltype(distance)>(distance),
461-
std::move(threadpool)
427+
return new svs::Vamana(svs::Vamana::assemble<float, storage_type>(
428+
stream, to_svs_distance(metric), std::move(threadpool)
462429
));
463-
});
430+
}
464431
}
465432

466433
public:

include/svs/core/data/io.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,22 @@ void populate_impl(
7979
}
8080
}
8181

82+
template <data::MemoryDataset Data> void populate(std::istream& is, Data& data) {
83+
auto accessor = DefaultWriteAccessor();
84+
85+
size_t num_vectors = data.size();
86+
size_t dims = data.dimensions();
87+
88+
auto max_lines = Dynamic;
89+
auto nvectors = std::min(num_vectors, max_lines);
90+
91+
auto reader = lib::VectorReader<typename Data::element_type>(dims);
92+
for (size_t i = 0; i < nvectors; ++i) {
93+
reader.read(is);
94+
accessor.set(data, i, reader.data());
95+
}
96+
}
97+
8298
// Intercept the native file to perform dispatch on the actual file type.
8399
template <data::MemoryDataset Data, typename WriteAccessor>
84100
void populate_impl(
@@ -120,6 +136,15 @@ void save(const Dataset& data, const File& file, const lib::UUID& uuid = lib::Ze
120136
return save(data, accessor, file, uuid);
121137
}
122138

139+
template <data::ImmutableMemoryDataset Dataset>
140+
void save(const Dataset& data, std::ostream& os) {
141+
auto accessor = DefaultReadAccessor();
142+
auto writer = svs::io::v1::StreamWriter<void>(os);
143+
for (size_t i = 0; i < data.size(); ++i) {
144+
writer << accessor.get(data, i);
145+
}
146+
}
147+
123148
///
124149
/// @brief Save the dataset as a "*vecs" file.
125150
///
@@ -169,6 +194,14 @@ lib::lazy_result_t<F, size_t, size_t> load_dataset(const File& file, const F& la
169194
return load_impl(detail::to_native(file), default_accessor, lazy);
170195
}
171196

197+
template <lib::LazyInvocable<size_t, size_t> F>
198+
lib::lazy_result_t<F, size_t, size_t>
199+
load_dataset(std::istream& is, const F& lazy, size_t num_vectors, size_t dims) {
200+
auto data = lazy(num_vectors, dims);
201+
populate(is, data);
202+
return data;
203+
}
204+
172205
// Return whether or not a file is directly loadable via file-extension.
173206
inline bool special_by_file_extension(std::string_view path) {
174207
return (path.ends_with("svs") || path.ends_with("vecs") || path.ends_with("bin"));

0 commit comments

Comments
 (0)