Skip to content

Commit 528ace6

Browse files
committed
[MMAP] Add in-memory (mmap) stream data loading support and related tests
- Introduced `memstream.h` and `memstream.cpp` for memory-mapped stream functionality. - Updated `io.h` and `simple.h` to include memory stream support. - Enhanced `simple.cpp` and `flat.cpp` tests to validate loading from memory streams.
1 parent d8ea62b commit 528ace6

7 files changed

Lines changed: 969 additions & 0 deletions

File tree

include/svs/core/data/io.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
// svs
2020
#include "svs/concepts/data.h"
2121
#include "svs/core/io.h"
22+
#include "svs/core/io/memstream.h"
2223

2324
#include "svs/lib/array.h"
2425
#include "svs/lib/exception.h"
@@ -113,6 +114,34 @@ void populate(Data& data, WriteAccessor& accessor, const File& file) {
113114
populate_impl(data, accessor, file, default_populate_tag);
114115
}
115116

117+
template <typename T> [[nodiscard]] size_t streaming_size(const T& obj) noexcept;
118+
119+
template <HasDataType T> [[nodiscard]] size_t streaming_size(const T& obj) noexcept {
120+
return element_size(datatype_v<T>);
121+
}
122+
123+
template <typename T, typename Alloc = ::std::allocator<T>>
124+
[[nodiscard]] size_t streaming_size(const ::std::vector<T, Alloc>& vec) noexcept {
125+
return vec.size() * streaming_size(T{});
126+
}
127+
128+
template <typename T, size_t N>
129+
[[nodiscard]] size_t streaming_size(const ::std::array<T, N>& arr) noexcept {
130+
return arr.size() * streaming_size(T{});
131+
}
132+
133+
template <typename... Ts>
134+
[[nodiscard]] size_t streaming_size(const ::std::tuple<Ts...>& tup) noexcept {
135+
size_t total = 0;
136+
lib::foreach (tup, [&](const auto& x) { total += streaming_size(x); });
137+
return total;
138+
}
139+
140+
template <typename T, typename Dims, typename Alloc = lib::Allocator<T>>
141+
[[nodiscard]] size_t streaming_size(const svs::DenseArray<T, Dims, Alloc>& arr) noexcept {
142+
return arr.size() * streaming_size(T{});
143+
}
144+
116145
/////
117146
///// Saving
118147
/////
@@ -202,6 +231,25 @@ load_dataset(std::istream& is, const F& lazy, size_t num_vectors, size_t dims) {
202231
return data;
203232
}
204233

234+
template <typename T, lib::LazyInvocable<size_t, size_t, T*> F>
235+
lib::lazy_result_t<F, size_t, size_t, T*>
236+
load_dataset(std::istream& is, const F& lazy, size_t num_vectors, size_t dims) {
237+
// If the allocator is vew, we should support in_memory_stream only since views are only
238+
// compatible with memory-mapped data.
239+
if (!is_memory_stream(is)) {
240+
throw ANNEXCEPTION(
241+
"Trying to load a dataset with a view allocator from a non-memory stream. This "
242+
"is not supported since views are compatible only with memory-mapped streams."
243+
);
244+
}
245+
// If the allocator is a view, we need to construct the view with the correct pointer.
246+
auto data = lazy(num_vectors, dims, io::current_ptr<T>(is));
247+
// Then we stream have to be seeked past the data we just loaded since the view will not
248+
// take ownership of the stream's position.
249+
is.seekg(streaming_size(data), std::ios_base::cur);
250+
return data;
251+
}
252+
205253
// Return whether or not a file is directly loadable via file-extension.
206254
inline bool special_by_file_extension(std::string_view path) {
207255
return (path.ends_with("svs") || path.ends_with("vecs") || path.ends_with("bin"));

include/svs/core/data/simple.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "svs/core/allocator.h"
2222
#include "svs/core/compact.h"
2323
#include "svs/core/data/io.h"
24+
#include "svs/core/io/memstream.h"
2425

2526
#include "svs/lib/array.h"
2627
#include "svs/lib/boundscheck.h"
@@ -153,6 +154,25 @@ class GenericSerializer {
153154

154155
return io::load_dataset(is, lazy, num_vectors, dims);
155156
}
157+
158+
template <typename T, lib::LazyInvocable<size_t, size_t, T*> F>
159+
static lib::lazy_result_t<F, size_t, size_t, T*>
160+
load(const lib::ContextFreeLoadTable& table, std::istream& is, const F& lazy) {
161+
auto datatype = lib::load_at<DataType>(table, "eltype");
162+
if (datatype != datatype_v<T>) {
163+
throw ANNEXCEPTION(
164+
"Trying to load an uncompressed dataset with element types {} to a dataset "
165+
"with element types {}.",
166+
name(datatype),
167+
name<datatype_v<T>>()
168+
);
169+
}
170+
171+
size_t num_vectors = lib::load_at<size_t>(table, "num_vectors");
172+
size_t dims = lib::load_at<size_t>(table, "dims");
173+
174+
return io::load_dataset<T>(is, lazy, num_vectors, dims);
175+
}
156176
};
157177

158178
struct Matcher {
@@ -486,6 +506,31 @@ class SimpleData {
486506
);
487507
}
488508

509+
static SimpleData load(const lib::LoadTable& SVS_UNUSED(table))
510+
requires(is_view)
511+
{
512+
throw ANNEXCEPTION("Trying to load a SimpleData view without an istream. This is "
513+
"not supported since "
514+
"views are compatible only with memory-mapped streams.");
515+
}
516+
517+
static SimpleData load(const lib::ContextFreeLoadTable& table, std::istream& is)
518+
requires(is_view)
519+
{
520+
if (!io::is_memory_stream(is)) {
521+
throw ANNEXCEPTION(
522+
"Trying to load a SimpleData view from a non-mmstream istream. This is not "
523+
"supported since views are compatible only with memory-mapped streams."
524+
);
525+
}
526+
527+
return GenericSerializer::load<T>(
528+
table, is, lib::Lazy([](size_t n_elements, size_t n_dimensions, T* ptr) {
529+
return SimpleData(n_elements, n_dimensions, View<T>{ptr});
530+
})
531+
);
532+
}
533+
489534
///
490535
/// @brief Try to automatically load the dataset.
491536
///
@@ -598,6 +643,11 @@ bool operator==(const SimpleData<T1, E1, A1>& x, const SimpleData<T2, E2, A2>& y
598643
return true;
599644
}
600645

646+
template <typename T, size_t Extent = Dynamic, typename Alloc = lib::Allocator<T>>
647+
size_t streaming_size(const svs::data::SimpleData<T, Extent, Alloc>& data) noexcept {
648+
return data.capacity() * data.element_size();
649+
}
650+
601651
/////
602652
///// Specialization for Blocked.
603653
/////

0 commit comments

Comments
 (0)