Your Question
Summary
This RFC proposes integrating TransferQueue into slime as an optional data plane for rollout-to-training data transfer and follow-up derived data exchange. When enabled, rollout data, component consumption, intermediate field write-back, and partition cleanup should go through TransferQueue. When disabled, slime must keep the existing Ray ObjectRef based flow unchanged.
The initial goal is not to rewrite slime into a fully async service framework. The goal is to introduce a clear, minimal, and extensible TransferQueue adapter that can replace direct Ray ObjectRef data movement on the main training path, while preserving the current training control flow.
Background
Today slime primarily transfers rollout data through Ray Object Store:
RolloutManager.generate()
-> convert samples to train_data
-> split train_data by DP rank
-> ray.put()
-> driver passes ObjectRef list to actor/critic
-> MegatronTrainRayActor reads local DP shard
This design is simple, but as RL training becomes more complex, the data dependency graph becomes harder to manage:
- rollout produces tokens, rewards, loss masks, rollout logprobs, routing replay data, multimodal inputs, and custom metadata.
- actor may compute log_probs and train on advantages or returns.
- critic may compute values and feed actor training.
- reference or teacher models may compute ref_log_probs or teacher_log_probs.
- future advantage workers may consume multiple fields and write back advantages and returns.
Passing all of these through driver-side ObjectRefs couples data movement with control flow. It also makes it difficult to express field readiness, consumer progress, partition lifecycle, staleness, and future async execution.
TransferQueue gives slime a structured data plane:
Rollout -> TransferQueue partition -> Actor/Critic/Reference consumers
Derived fields -> TransferQueue write-back -> Actor final train
Motivation
TransferQueue is valuable for slime because it turns rollout and training data exchange into an explicit protocol:
partition_id represents one rollout step.
task_name represents an independent consumer.
data_fields represents the fields required by each consumer.
- metadata can carry sampling and scheduling information such as
total_lengths.
- partition cleanup gives data lifecycle a concrete boundary.
- staleness/backpressure can be controlled by the data plane.
This creates a cleaner path for:
- lower Ray Object Store pressure.
- clearer actor/critic/ref data dependencies.
- future derived field write-back, such as
values, log_probs, ref_log_probs, advantages, and returns.
- streaming or fully async training in later phases.
- better observability around data readiness and consumer progress.
Goals
-
Add TransferQueue as an optional data plane controlled by --use-transfer-queue.
-
Preserve the existing Ray ObjectRef flow when TransferQueue is disabled.
-
Keep external TransferQueue imports lazy so users do not need TransferQueue installed unless they enable it.
-
Use a clear partition protocol:
partition_id = train_{rollout_id}
-
Use explicit consumer task names:
actor_train
critic_train
actor_log_probs
ref_log_probs
compute_advantages_and_returns
-
Centralize TransferQueue protocol code under slime/utils/transfer_queue.py.
-
Make rollout write data to TransferQueue before actor/critic consumption.
-
Make actor/critic read from TransferQueue in TransferQueue mode.
-
Support future write-back of derived fields into the same partition.
-
Add diagnostics for empty metadata, missing fields, wait time, and partition lifecycle.
Non-Goals
This RFC does not propose to:
- replace the current driver loop with a full service controller.
- introduce Ray Serve based actor/reference/advantage services in the first phase.
- implement TransferQueue StreamingDataLoader in the first phase.
- change model weight update logic.
- change rollout engine management.
- make TransferQueue mandatory.
- support
--debug-train-only plus TransferQueue in the initial integration.
- support dynamic global batch size plus TransferQueue in the initial integration.
Proposed Design
1. Feature Flag
Add a CLI flag:
Behavior:
- disabled: keep current Ray ObjectRef flow.
- enabled: rollout writes train data to TransferQueue, and actor/critic fetch by
rollout_id.
Related options:
--num-data-storage-units
--max-staleness
--polling-mode / --no-polling-mode
--transfer-queue-staleness-poll-interval
--transfer-queue-extra-data-fields
2. Initialization Lifecycle
Driver:
pgs = create_placement_groups(args)
initialize_transfer_queue(args)
rollout_manager, _ = create_rollout_manager(args, pgs["rollout"])
actor_model, critic_model = create_training_models(args, pgs, rollout_manager)
RolloutManager and train actors:
self.transfer_queue_client = connect_transfer_queue(args)
The driver initializes the TransferQueue controller/storage and stores the resulting config on args.tq_config. Ray actors connect to the existing TransferQueue using the same config.
3. Partition Protocol
Each rollout step maps to one partition:
Examples:
| rollout_id |
partition_id |
| 0 |
train_0 |
| 1 |
train_1 |
| 128 |
train_128 |
Partition lifecycle:
- rollout writes initial training fields.
- actor/critic/ref/advantage consumers read required fields.
- optional derived fields are written back.
- actor final training consumes final fields.
- actor clears the partition after successful training.
For critic-only warmup, the driver must clear the partition because actor does not train.
4. Task Names
Use task names to isolate consumer cursors:
| Task name |
Owner |
Purpose |
actor_train |
actor |
final policy training data consumption |
critic_train |
critic |
PPO critic training data consumption |
actor_log_probs |
actor or actor-fwd stage |
compute and write log_probs |
ref_log_probs |
reference stage |
compute and write ref_log_probs |
compute_advantages_and_returns |
advantage stage |
compute and write advantages and returns |
The first implementation may only use actor_train and critic_train, but the names above define the forward-compatible protocol.
5. Data Fields
Base rollout fields:
tokens
total_lengths
response_lengths
loss_masks
rewards
raw_reward
truncated
sample_indices
Optional rollout fields:
rollout_log_probs
rollout_routed_experts
multimodal_train_inputs
teacher_log_probs
metadata
custom fields from --transfer-queue-extra-data-fields
Derived fields for future write-back:
values
log_probs
ref_log_probs
advantages
returns
kl
opd_reverse_kl
Field rules:
- Consumer requested fields must be explicit.
- Producer must either write required fields or normalize missing base fields before writing.
- Missing optional fields must not be requested unless the corresponding feature is enabled.
- Custom fields must be listed in
--transfer-queue-extra-data-fields.
6. Rollout Write Path
When TransferQueue is disabled:
return self._split_train_data_by_dp(data, dp_size)
When TransferQueue is enabled:
transfer_rollout_data(args, client, rollout_id, data)
return None
Before writing:
- add
total_lengths.
- validate required fields.
- fill stable defaults for
raw_reward, truncated, and sample_indices when safe.
- convert to TensorDict.
- write to partition
train_{rollout_id}.
- write
total_lengths into custom metadata for sequence length balanced sampling.
7. Train Read Path
When TransferQueue is disabled:
rollout_data = self._get_rollout_data(rollout_data_ref)
When TransferQueue is enabled:
rollout_data = self._get_rollout_data_from_transfer_queue(rollout_id)
Only the model-parallel source rank should fetch from TransferQueue:
TP rank 0 && PP rank 0 && CP rank 0
Fetched data is then broadcast to the model-parallel group:
- CP group
- TP group
- PP group
This avoids multiple ranks consuming the same task cursor independently.
8. Data Parallel Sampling
The TransferQueue batch size per consumer is:
rollout_batch_size * n_samples_per_prompt / data_parallel_size
The data parallel size must match Megatron DP:
actor_world_size / (tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size)
For SeqlenBalancedSampler, the same DP size must be used during TransferQueue initialization.
9. Staleness and Backpressure
--max-staleness controls how many unconsumed train partitions may remain before rollout waits.
Strict mode:
This means rollout cannot write a new train partition while an old one is still present.
Relaxed mode:
This allows rollout to lead training by up to N partitions.
10. Partition Cleanup
Normal actor step:
if role == "actor" and rank == 0:
clear_partition(args, client, rollout_id)
Critic-only warmup:
ray.get(rollout_manager.clear_transfer_queue_partition.remote(rollout_id))
Cleanup must only happen after all required consumers for the step have finished.
Compatibility
Default Behavior
TransferQueue is disabled by default. Existing users should see no behavior change.
External Dependencies
The external packages transfer_queue and tensordict are imported lazily. They are only required when --use-transfer-queue is enabled.
Unsupported Initial Combinations
Initial integration should reject:
--use-transfer-queue + --debug-train-only
--use-transfer-queue + --load-debug-rollout-data
--use-transfer-queue + --use-dynamic-global-batch-size
These can be revisited later with explicit producer and metadata support.
Observability
The integration should log or expose metrics for:
- TransferQueue initialization config.
- partition id.
- task name.
- requested data fields.
- batch_meta size.
- get_meta wait time.
- get_data latency.
- put latency.
- clear latency.
- partition list size.
- staleness wait time.
When actor waits for data, logs should include:
rollout_id
partition_id
task_name
data_fields
dp_rank
batch_size
elapsed_wait_time
This is critical because missing fields and empty metadata otherwise look like a training hang.
Testing Plan
Unit Tests
Cover:
default_train_data_fields.
- required and optional field selection.
- train data normalization before TransferQueue write.
- DP size calculation with TP/PP/CP.
dict_to_tensordict for 1D tensors, jagged tensors, routed experts, and non-tensor fields.
tensordict_to_rollout_data.
- staleness wait behavior with mocked client.
Integration Tests
Cover:
- GRPO single step with TransferQueue.
- PPO single step with critic.
- critic-only warmup cleanup.
max_staleness=0.
max_staleness>0.
balance_data=True.
- rollout logprobs.
- routing replay.
- multimodal train inputs.
- custom extra fields.
Plan
Phase 1: Optional Rollout-to-Train Transfer
- initialize and connect TransferQueue.
- rollout writes full train batch.
- actor/critic read from TransferQueue.
- actor clears partition.
- keep driver control flow unchanged.
Phase 2: Derived Field Write-Back
- critic writes
values.
- actor-fwd writes
log_probs.
- reference writes
ref_log_probs.
- advantage stage writes
advantages and returns.
- actor final train consumes derived fields from TransferQueue.
Phase 3: Streaming and Fully Async
- introduce streaming dataset or dataloader.
- support chunk-level rollout writes.
- split actor-fwd/reference/advantage into independent services.
- add production/consumption status and recovery protocol.
What I've Tried
nothing
Environment (if relevant)
Additional Context
No response
Pre-submission Checklist
Your Question
Summary
This RFC proposes integrating TransferQueue into slime as an optional data plane for rollout-to-training data transfer and follow-up derived data exchange. When enabled, rollout data, component consumption, intermediate field write-back, and partition cleanup should go through TransferQueue. When disabled, slime must keep the existing Ray ObjectRef based flow unchanged.
The initial goal is not to rewrite slime into a fully async service framework. The goal is to introduce a clear, minimal, and extensible TransferQueue adapter that can replace direct Ray ObjectRef data movement on the main training path, while preserving the current training control flow.
Background
Today slime primarily transfers rollout data through Ray Object Store:
This design is simple, but as RL training becomes more complex, the data dependency graph becomes harder to manage:
Passing all of these through driver-side ObjectRefs couples data movement with control flow. It also makes it difficult to express field readiness, consumer progress, partition lifecycle, staleness, and future async execution.
TransferQueue gives slime a structured data plane:
Motivation
TransferQueue is valuable for slime because it turns rollout and training data exchange into an explicit protocol:
partition_idrepresents one rollout step.task_namerepresents an independent consumer.data_fieldsrepresents the fields required by each consumer.total_lengths.This creates a cleaner path for:
values,log_probs,ref_log_probs,advantages, andreturns.Goals
Add TransferQueue as an optional data plane controlled by
--use-transfer-queue.Preserve the existing Ray ObjectRef flow when TransferQueue is disabled.
Keep external TransferQueue imports lazy so users do not need TransferQueue installed unless they enable it.
Use a clear partition protocol:
Use explicit consumer task names:
Centralize TransferQueue protocol code under
slime/utils/transfer_queue.py.Make rollout write data to TransferQueue before actor/critic consumption.
Make actor/critic read from TransferQueue in TransferQueue mode.
Support future write-back of derived fields into the same partition.
Add diagnostics for empty metadata, missing fields, wait time, and partition lifecycle.
Non-Goals
This RFC does not propose to:
--debug-train-onlyplus TransferQueue in the initial integration.Proposed Design
1. Feature Flag
Add a CLI flag:
Behavior:
rollout_id.Related options:
2. Initialization Lifecycle
Driver:
RolloutManager and train actors:
The driver initializes the TransferQueue controller/storage and stores the resulting config on
args.tq_config. Ray actors connect to the existing TransferQueue using the same config.3. Partition Protocol
Each rollout step maps to one partition:
Examples:
Partition lifecycle:
For critic-only warmup, the driver must clear the partition because actor does not train.
4. Task Names
Use task names to isolate consumer cursors:
actor_traincritic_trainactor_log_probslog_probsref_log_probsref_log_probscompute_advantages_and_returnsadvantagesandreturnsThe first implementation may only use
actor_trainandcritic_train, but the names above define the forward-compatible protocol.5. Data Fields
Base rollout fields:
Optional rollout fields:
Derived fields for future write-back:
Field rules:
--transfer-queue-extra-data-fields.6. Rollout Write Path
When TransferQueue is disabled:
When TransferQueue is enabled:
Before writing:
total_lengths.raw_reward,truncated, andsample_indiceswhen safe.train_{rollout_id}.total_lengthsinto custom metadata for sequence length balanced sampling.7. Train Read Path
When TransferQueue is disabled:
When TransferQueue is enabled:
Only the model-parallel source rank should fetch from TransferQueue:
Fetched data is then broadcast to the model-parallel group:
This avoids multiple ranks consuming the same task cursor independently.
8. Data Parallel Sampling
The TransferQueue batch size per consumer is:
The data parallel size must match Megatron DP:
For
SeqlenBalancedSampler, the same DP size must be used during TransferQueue initialization.9. Staleness and Backpressure
--max-stalenesscontrols how many unconsumed train partitions may remain before rollout waits.Strict mode:
This means rollout cannot write a new train partition while an old one is still present.
Relaxed mode:
This allows rollout to lead training by up to
Npartitions.10. Partition Cleanup
Normal actor step:
Critic-only warmup:
Cleanup must only happen after all required consumers for the step have finished.
Compatibility
Default Behavior
TransferQueue is disabled by default. Existing users should see no behavior change.
External Dependencies
The external packages
transfer_queueandtensordictare imported lazily. They are only required when--use-transfer-queueis enabled.Unsupported Initial Combinations
Initial integration should reject:
These can be revisited later with explicit producer and metadata support.
Observability
The integration should log or expose metrics for:
When actor waits for data, logs should include:
This is critical because missing fields and empty metadata otherwise look like a training hang.
Testing Plan
Unit Tests
Cover:
default_train_data_fields.dict_to_tensordictfor 1D tensors, jagged tensors, routed experts, and non-tensor fields.tensordict_to_rollout_data.Integration Tests
Cover:
max_staleness=0.max_staleness>0.balance_data=True.Plan
Phase 1: Optional Rollout-to-Train Transfer
Phase 2: Derived Field Write-Back
values.log_probs.ref_log_probs.advantagesandreturns.Phase 3: Streaming and Fully Async
What I've Tried
nothing
Environment (if relevant)
Additional Context
No response
Pre-submission Checklist