Skip to content

Commit 9b465c2

Browse files
grao1991claude
andcommitted
[trading-native][storage-core] Commit applier + JMT pipeline + AptosDB wiring
Adds the durable-commit applier, the two-stage JMT commit pipeline, and the AptosDB integration for native positions. Builds on the durable storage tier in `[storage-core-db]` and the shared primitives in `[shared] LeafEntry trait + StateSummary Option` / `[shared] Lift async-commit pipeline`. The in-memory scanner mirror (the `NativeStateStore` DashMap and its readers) is layered on top in a later `[in-memory-store]` commit; the durable + JMT side here stands on its own. Commit applier: - `NativeStateCommitter::apply`: per-shard `SchemaBatch` fan-out against `position_db` for every Position write, returning `NativeMerkleLeafUpdates` (`MerkleLeafUpdate` per write) for the JMT pipeline. Stale-index entries for prior versions are written into the same batch so the pruner can drop them later. Commit pipeline: - `PositionBufferedState` — alias of the shared `BufferedState<L, S, P, X>` generic with position's pipeline parameters and a no-op `PositionExtras`. Constructed via `spawn_commit_pipeline`. - `PositionSnapshotCommitter` driver (`merklize_position`): pulls `PositionSnapshotToCommit` payloads, runs the snapshot through `ShardedJmtMerkleDb::merklize_snapshot`, forwards the resulting `PositionMerkleCommit` downstream. - `PositionMerkleBatchCommitter`: pulls `PositionMerkleCommit`s and persists per-shard JMT batches + top-levels batch to `position_merkle_db`. State-shape primitives in `storage-interface`: `storage/storage-interface/src/state_store/sharded_jmt_state.rs` with the JMT-sharded pipeline primitives that aren't main-state shared: `LeafSlot<V>` + `impl LeafEntry for LeafSlot<V>`, `ShardedJmtState<Slot>` (16 `MapLayer` chains + `extend` / `make_delta` / `is_descendant_of`), the inherent impl on `StateAndSummary<ShardedJmtState<Slot: LeafEntry>>` (`new_empty`, `extend`, `make_delta`, etc.), and `pre_shard_jmt_updates`. Adds two `StateSummary` constructors that only JMT-sharded pipelines without a hot companion need: `new_global_only(version, smt)` and `new_empty_global_only()`. In-aptosdb crate: - `PositionSlot` — type alias of `LeafSlot<()>` (value not carried in-slot; lives in `position_db` durably). - `PositionStateStore` — owner / coordinator, type alias of the shared `PipelineStateStore<L, BS>` (also added in this commit, in `crate::common`). Wraps the shared `current_state` mutex (readable by outside consumers) and the `PositionBufferedState` mutex (commit-path serialization). - `PositionDb` gains the `commit` / `commit_single_shard` / `write_progress` / `find_prior_version` methods that the committer uses. AptosDB wiring: - `AptosDB` gains optional handles for `PositionDb` + `PositionMerkleDb` + `PositionStateStore`. Accessed by the rest of the codebase via `native_position_handles` / `native_state_committer`. - `init_native_position`: opens 16 sharded position DBs + unsharded merkle DB with production CF tuning. Spawns the async commit pipeline seeded with the empty-tree placeholder root. Idempotent (rejects second call). Logs the open. - `commit_native_position` (in `aptosdb_writer.rs`): rayon-spawned alongside main-state commit; drains `output.write_set().native_position_iter()` (the WriteSet sibling bucket from [types]) through `NativeStateCommitter`, feeding the resulting `MerkleLeafUpdate`s into the position JMT pipeline. Cargo.toml deps + lib.rs mod exports for the new files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8297de8 commit 9b465c2

19 files changed

Lines changed: 1152 additions & 8 deletions

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

storage/aptosdb/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ aptos-crypto = { workspace = true }
2020
aptos-db-indexer = { workspace = true }
2121
aptos-db-indexer-schemas = { workspace = true, features = ["fuzzing"] }
2222
aptos-executor-types = { workspace = true }
23+
aptos-experimental-layered-map = { workspace = true }
2324
aptos-experimental-runtimes = { workspace = true }
2425
aptos-infallible = { workspace = true }
2526
aptos-jellyfish-merkle = { workspace = true }
@@ -54,6 +55,7 @@ rayon = { workspace = true }
5455
serde = { workspace = true }
5556
static_assertions = { workspace = true }
5657
status-line = { workspace = true }
58+
thiserror = { workspace = true }
5759
tokio = { workspace = true }
5860

5961
[dev-dependencies]

storage/aptosdb/src/common.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,3 +449,27 @@ where
449449
}
450450
}
451451
}
452+
453+
/// `current_state` is shared with outside readers; `buffered_state` is
454+
/// the commit-path mutex.
455+
pub struct PipelineStateStore<L, BS> {
456+
current_state: Arc<Mutex<L>>,
457+
buffered_state: Mutex<BS>,
458+
}
459+
460+
impl<L, BS> PipelineStateStore<L, BS> {
461+
pub fn from_parts(current_state: Arc<Mutex<L>>, buffered_state: BS) -> Self {
462+
Self {
463+
current_state,
464+
buffered_state: Mutex::new(buffered_state),
465+
}
466+
}
467+
468+
pub fn current_state(&self) -> Arc<Mutex<L>> {
469+
Arc::clone(&self.current_state)
470+
}
471+
472+
pub(crate) fn buffered_state_locked(&self) -> aptos_infallible::MutexGuard<'_, BS> {
473+
self.buffered_state.lock()
474+
}
475+
}

storage/aptosdb/src/db/aptosdb_internal.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ impl AptosDB {
8484
hot_state_config,
8585
));
8686

87+
// The native-position DB isn't open here; it attaches later
88+
// via `AptosDB::init_native_position`, which re-binds the
89+
// pruner via `LedgerPrunerManager::attach_native_pruners`.
8790
let ledger_pruner = LedgerPrunerManager::new(
8891
Arc::clone(&ledger_db),
8992
pruner_config.ledger_pruner_config,
@@ -106,6 +109,7 @@ impl AptosDB {
106109
pre_commit_lock: std::sync::Mutex::new(()),
107110
commit_lock: std::sync::Mutex::new(()),
108111
update_subscriber: None,
112+
position: None,
109113
}
110114
}
111115

@@ -155,7 +159,7 @@ impl AptosDB {
155159
db.write_pruner_progress(synced_version)?;
156160
}
157161

158-
let myself = Self::new_with_dbs(
162+
let mut myself = Self::new_with_dbs(
159163
ledger_db,
160164
hot_state_merkle_db,
161165
state_merkle_db,
@@ -169,6 +173,14 @@ impl AptosDB {
169173
hot_state_config,
170174
);
171175

176+
if super::ENABLE_NATIVE_POSITION {
177+
myself.init_native_position(
178+
db_paths,
179+
rocksdb_configs.state_kv_db_config,
180+
readonly,
181+
)?;
182+
}
183+
172184
if !readonly {
173185
if let Some(version) = myself.get_synced_version()? {
174186
myself

storage/aptosdb/src/db/aptosdb_writer.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use crate::{
1313
metrics::{
1414
COMMITTED_TXNS, LATEST_TXN_VERSION, LEDGER_VERSION, NEXT_BLOCK_EPOCH, OTHER_TIMERS_SECONDS,
1515
},
16+
native_state_committer::NativeStateCommitter,
17+
position_buffered_state::PositionProofReader,
1618
pruner::PrunerManager,
1719
schema::{
1820
db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
@@ -310,11 +312,134 @@ impl AptosDB {
310312
.commit_transaction_accumulator(chunk.first_version, chunk.transaction_infos)
311313
.unwrap()
312314
});
315+
// Native-position subsystem: dispatch Position writes
316+
// from each TransactionOutput's `native_writes()`
317+
// side-channel to `position_db` + the position JMT. Gated
318+
// on the bundle being attached so baselines that never
319+
// called `init_native_position` skip the spawn entirely —
320+
// no rayon scheduling cost.
321+
if self.position.is_some() {
322+
s.spawn(|_| self.commit_native_position(chunk).unwrap());
323+
}
313324
});
314325

315326
Ok(new_root_hash)
316327
}
317328

329+
/// Dispatch every Position write in the block's
330+
/// `TransactionOutput`s to [`NativeStateCommitter`]. No-op if the
331+
/// native-position subsystem has not been initialized on this
332+
/// AptosDB instance (i.e. `init_native_position` wasn't called at
333+
/// node open).
334+
///
335+
/// For each `output.write_set().native_position_iter()`:
336+
/// 1. `NativeStateCommitter::apply` writes the Position rows to
337+
/// `position_db` (per-shard fan-out) and returns a list of
338+
/// [`crate::native_state_committer::MerkleLeafUpdate`]s for the
339+
/// position JMT.
340+
/// 2. If the position async pipeline is attached, the per-version
341+
/// `ShardedJmtState::extend` updates the scratchpad SMT against
342+
/// a `PositionProofReader` and the chunk's final state is
343+
/// handed to `PipelineStateStore::buffered_state_locked()`
344+
/// + `update(..)` for the snapshot/batch committer threads —
345+
/// same two-stage shape as main state's `BufferedState`.
346+
fn commit_native_position(&self, chunk: &ChunkToCommit) -> Result<()> {
347+
let _timer = OTHER_TIMERS_SECONDS.timer_with(&["commit_native_position"]);
348+
let Some(bundle) = self.position.as_ref() else {
349+
return Ok(());
350+
};
351+
let committer = NativeStateCommitter::new(bundle.kv_db.clone());
352+
353+
// The chunk's `state.last_checkpoint().version()` is the chain-
354+
// level checkpoint marker — same signal main state's
355+
// `StateStore::update` uses. If it falls inside this chunk,
356+
// the position chain's `last_checkpoint` must advance to the
357+
// position state at that exact version. If it doesn't, we
358+
// carry the prior `last_checkpoint` forward.
359+
let chunk_first = chunk.first_version;
360+
let chunk_last = chunk_first + chunk.len() as Version;
361+
let chunk_checkpoint_version = chunk.state.last_checkpoint().version();
362+
let checkpoint_within_chunk =
363+
chunk_checkpoint_version.filter(|v| (chunk_first..chunk_last).contains(v));
364+
365+
// Snapshot the prior position state once; per-version `extend`
366+
// builds the chain locally. We hand the final state to
367+
// `update()` at chunk end (per-chunk granularity, matching
368+
// main state).
369+
let store_opt = bundle.state_store.as_ref();
370+
let mut chain_state = store_opt.map(|s| s.current_state().lock().latest().clone());
371+
let mut chain_checkpoint =
372+
store_opt.map(|s| s.current_state().lock().last_checkpoint().clone());
373+
374+
let mut version = chunk_first;
375+
for output in chunk.transaction_outputs {
376+
// Native-position writes ride in the WriteSet's
377+
// `native_positions` sibling bucket — value/hotness
378+
// iterators on the same WriteSet naturally skip them, so
379+
// no main-state path ever sees them.
380+
let position_writes: Vec<_> = output
381+
.write_set()
382+
.native_position_iter()
383+
.map(|(k, op)| (k.clone(), op.as_write_op().clone()))
384+
.collect();
385+
let merkle_updates_position = if !position_writes.is_empty() {
386+
committer
387+
.apply(version, position_writes)
388+
.map_err(|e| AptosDbError::Other(format!("native commit: {e}")))?
389+
.position
390+
} else {
391+
Vec::new()
392+
};
393+
394+
// Advance the per-version chain. `extend` runs the
395+
// scratchpad SMT update; the `last_checkpoint` is
396+
// captured below if this is the chunk's checkpoint
397+
// version.
398+
if let (Some(store), Some(prior_latest)) = (store_opt, chain_state.as_ref()) {
399+
let proof_reader = PositionProofReader {
400+
merkle_db: bundle.merkle_db.clone(),
401+
version: store
402+
.current_state()
403+
.lock()
404+
.latest()
405+
.version()
406+
.unwrap_or(version),
407+
};
408+
let position_updates: Vec<_> = merkle_updates_position
409+
.into_iter()
410+
.map(|u| {
411+
(u.state_key_hash, crate::position_buffered_state::PositionSlot {
412+
state_key: u.state_key,
413+
value_hash: u.value_hash,
414+
value: None,
415+
})
416+
})
417+
.collect();
418+
let new_latest = prior_latest.extend(version, position_updates, &proof_reader)?;
419+
if Some(version) == checkpoint_within_chunk {
420+
chain_checkpoint = Some(new_latest.clone());
421+
}
422+
chain_state = Some(new_latest);
423+
}
424+
version += 1;
425+
}
426+
427+
// Hand the final per-chunk state to the buffered state.
428+
// `update()` enqueues a snapshot flush only if
429+
// `last_checkpoint` actually advanced — matching main state.
430+
if let Some(store) = store_opt {
431+
let new_state =
432+
crate::position_buffered_state::PositionLedgerStateWithSummary::from_latest_and_last_checkpoint(
433+
chain_state.expect("chain_state set when store is set"),
434+
chain_checkpoint.expect("chain_checkpoint set when store is set"),
435+
);
436+
let estimated_items = chunk.transaction_outputs.len();
437+
let mut bufstate = store.buffered_state_locked();
438+
bufstate.update(new_state, (), estimated_items, /* sync_commit */ false)?;
439+
}
440+
Ok(())
441+
}
442+
318443
fn commit_state_kv_and_ledger_metadata(&self, chunk: &ChunkToCommit) -> Result<()> {
319444
let _timer = OTHER_TIMERS_SECONDS.timer_with(&["commit_state_kv_and_ledger_metadata"]);
320445

storage/aptosdb/src/db/mod.rs

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,24 @@
22
// Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE
33

44
use crate::{
5-
backup::backup_handler::BackupHandler, event_store::EventStore, ledger_db::LedgerDb,
6-
pruner::LedgerPrunerManager, rocksdb_property_reporter::RocksdbPropertyReporter,
7-
state_kv_db::StateKvDb, state_merkle_db::StateMerkleDb, state_store::StateStore,
5+
backup::backup_handler::BackupHandler,
6+
event_store::EventStore,
7+
ledger_db::LedgerDb,
8+
native_state_committer::NativeStateCommitter,
9+
position_buffered_state::new_empty_position_state,
10+
position_db::{PositionDb, NUM_NATIVE_VALUE_SHARDS},
11+
position_merkle_db::PositionMerkleDb,
12+
position_state_store::PositionStateStore,
13+
pruner::LedgerPrunerManager,
14+
rocksdb_property_reporter::RocksdbPropertyReporter,
15+
state_kv_db::StateKvDb,
16+
state_merkle_db::StateMerkleDb,
17+
state_store::StateStore,
818
transaction_store::TransactionStore,
19+
utils::truncation_helper::{
20+
get_position_commit_progress, get_position_merkle_commit_progress,
21+
truncate_position_db_shards, truncate_position_merkle_db_shards,
22+
},
923
};
1024
use aptos_config::config::{HotStateConfig, PrunerConfig, RocksdbConfigs, StorageDirPaths};
1125
use aptos_db_indexer::db_indexer::InternalIndexerDB;
@@ -21,6 +35,11 @@ mod aptosdb_test;
2135
#[cfg(any(test, feature = "fuzzing"))]
2236
pub mod test_helper;
2337

38+
/// Compile-time gate for the native-position subsystem. Flip to `true`
39+
/// once order/collateral land and the subsystem ships end-to-end.
40+
/// Until then, every AptosDB skips opening the position DBs.
41+
pub(crate) const ENABLE_NATIVE_POSITION: bool = false;
42+
2443
/// This holds a handle to the underlying DB responsible for physical storage and provides APIs for
2544
/// access to the core Aptos data structures.
2645
pub struct AptosDB {
@@ -37,6 +56,105 @@ pub struct AptosDB {
3756
/// This is just to detect concurrent calls to `commit_ledger()`
3857
commit_lock: std::sync::Mutex<()>,
3958
update_subscriber: Option<Sender<(Instant, Version)>>,
59+
/// Native-position subsystem. `None` until `init_native_position`
60+
/// is called from the node-open path. Each future native-mirror
61+
/// entity (order, collateral, ...) gets its own bundle field.
62+
pub(crate) position: Option<Arc<PositionBundle>>,
63+
}
64+
65+
/// Durable + runtime handles for the native-position subsystem.
66+
/// Bundled so the AptosDB struct grows by one field per entity
67+
/// instead of per handle, and so consumers always reach handles
68+
/// through a single coherent unit.
69+
pub struct PositionBundle {
70+
pub kv_db: Arc<PositionDb>,
71+
pub merkle_db: Arc<PositionMerkleDb>,
72+
/// `None` in readonly mode; the async commit pipeline is the
73+
/// only piece that isn't safe to construct against a readonly
74+
/// mount.
75+
pub(crate) state_store: Option<Arc<PositionStateStore>>,
76+
}
77+
78+
impl AptosDB {
79+
/// `Some` once `init_native_position` has wired the subsystem.
80+
pub fn position(&self) -> Option<&Arc<PositionBundle>> {
81+
self.position.as_ref()
82+
}
83+
84+
/// Build a ready-to-use [`NativeStateCommitter`] from the
85+
/// attached bundle. Returns `None` if the subsystem hasn't been
86+
/// initialized.
87+
pub fn native_state_committer(&self) -> Option<NativeStateCommitter> {
88+
let bundle = self.position.as_ref()?;
89+
Some(NativeStateCommitter::new(bundle.kv_db.clone()))
90+
}
91+
92+
/// Initialize the native-position subsystem. Opens `position_db` /
93+
/// `position_merkle_db` and (in non-readonly mode) spawns the async
94+
/// commit pipeline. Idempotent — a second call returns
95+
/// `AlreadyExists`. Gated by the `ENABLE_NATIVE_POSITION` compile-
96+
/// time constant; called automatically from `open_internal`.
97+
pub fn init_native_position(
98+
&mut self,
99+
db_paths: &StorageDirPaths,
100+
rocksdb_config: aptos_config::config::RocksdbConfig,
101+
readonly: bool,
102+
) -> Result<()> {
103+
if self.position.is_some() {
104+
return Err(AptosDbError::Other(
105+
"init_native_position called twice; native-position subsystem is already \
106+
attached to this AptosDB"
107+
.to_string(),
108+
));
109+
}
110+
111+
let env = aptos_schemadb::Env::new()
112+
.map_err(|e| AptosDbError::Other(format!("failed to create RocksDB env: {e}")))?;
113+
114+
let position_db = PositionDb::new(db_paths, rocksdb_config, Some(&env), None, readonly)?;
115+
if !readonly && let Some(progress) = get_position_commit_progress(&position_db)? {
116+
truncate_position_db_shards(&position_db, progress)?;
117+
}
118+
119+
let merkle_db = PositionMerkleDb::new(
120+
db_paths,
121+
rocksdb_config,
122+
Some(&env),
123+
None,
124+
readonly,
125+
/* max_nodes_per_lru_cache_shard — caches off for now */ 0,
126+
)?;
127+
if !readonly && let Some(progress) = get_position_merkle_commit_progress(&merkle_db)? {
128+
truncate_position_merkle_db_shards(&merkle_db, progress)?;
129+
}
130+
let kv_db = Arc::new(position_db);
131+
let merkle_db = Arc::new(merkle_db);
132+
133+
let state_store = if readonly {
134+
None
135+
} else {
136+
let last_snapshot = new_empty_position_state();
137+
Some(Arc::new(PositionStateStore::new_at_snapshot(
138+
Arc::clone(&merkle_db),
139+
Arc::clone(&self.ledger_db),
140+
last_snapshot,
141+
)))
142+
};
143+
144+
self.position = Some(Arc::new(PositionBundle {
145+
kv_db,
146+
merkle_db,
147+
state_store,
148+
}));
149+
150+
info!(
151+
num_shards = NUM_NATIVE_VALUE_SHARDS,
152+
readonly = readonly,
153+
"Native-position subsystem initialized."
154+
);
155+
156+
Ok(())
157+
}
40158
}
41159

42160
// DbReader implementations and private functions used by them.
@@ -212,6 +330,13 @@ impl AptosDB {
212330
cp_path.as_ref(),
213331
/* is_hot = */ false,
214332
)?;
333+
// Native-position DBs. The static `create_checkpoint` opens
334+
// the source DB from `db_path` (creating it if absent), then
335+
// checkpoints into `cp_path`. Deployments that never activated
336+
// native-position still produce empty position checkpoints —
337+
// matches state's always-create behavior.
338+
PositionDb::create_checkpoint(db_path.as_ref(), cp_path.as_ref())?;
339+
PositionMerkleDb::create_checkpoint(db_path.as_ref(), cp_path.as_ref())?;
215340

216341
info!(
217342
db_path = db_path.as_ref(),

0 commit comments

Comments
 (0)