Skip to content

[RFC] Integrate TransferQueue into slime as an Optional Training Data Plane #1971

Description

@miracle0517

Your Question

Summary

This RFC proposes integrating TransferQueue into slime as an optional data plane for rollout-to-training data transfer and follow-up derived data exchange. When enabled, rollout data, component consumption, intermediate field write-back, and partition cleanup should go through TransferQueue. When disabled, slime must keep the existing Ray ObjectRef based flow unchanged.

The initial goal is not to rewrite slime into a fully async service framework. The goal is to introduce a clear, minimal, and extensible TransferQueue adapter that can replace direct Ray ObjectRef data movement on the main training path, while preserving the current training control flow.

Background

Today slime primarily transfers rollout data through Ray Object Store:

RolloutManager.generate()
  -> convert samples to train_data
  -> split train_data by DP rank
  -> ray.put()
  -> driver passes ObjectRef list to actor/critic
  -> MegatronTrainRayActor reads local DP shard

This design is simple, but as RL training becomes more complex, the data dependency graph becomes harder to manage:

  • rollout produces tokens, rewards, loss masks, rollout logprobs, routing replay data, multimodal inputs, and custom metadata.
  • actor may compute log_probs and train on advantages or returns.
  • critic may compute values and feed actor training.
  • reference or teacher models may compute ref_log_probs or teacher_log_probs.
  • future advantage workers may consume multiple fields and write back advantages and returns.

Passing all of these through driver-side ObjectRefs couples data movement with control flow. It also makes it difficult to express field readiness, consumer progress, partition lifecycle, staleness, and future async execution.

TransferQueue gives slime a structured data plane:

Rollout -> TransferQueue partition -> Actor/Critic/Reference consumers
Derived fields -> TransferQueue write-back -> Actor final train

Motivation

TransferQueue is valuable for slime because it turns rollout and training data exchange into an explicit protocol:

  • partition_id represents one rollout step.
  • task_name represents an independent consumer.
  • data_fields represents the fields required by each consumer.
  • metadata can carry sampling and scheduling information such as total_lengths.
  • partition cleanup gives data lifecycle a concrete boundary.
  • staleness/backpressure can be controlled by the data plane.

This creates a cleaner path for:

  • lower Ray Object Store pressure.
  • clearer actor/critic/ref data dependencies.
  • future derived field write-back, such as values, log_probs, ref_log_probs, advantages, and returns.
  • streaming or fully async training in later phases.
  • better observability around data readiness and consumer progress.

Goals

  1. Add TransferQueue as an optional data plane controlled by --use-transfer-queue.

  2. Preserve the existing Ray ObjectRef flow when TransferQueue is disabled.

  3. Keep external TransferQueue imports lazy so users do not need TransferQueue installed unless they enable it.

  4. Use a clear partition protocol:

    partition_id = train_{rollout_id}
    
  5. Use explicit consumer task names:

    actor_train
    critic_train
    actor_log_probs
    ref_log_probs
    compute_advantages_and_returns
    
  6. Centralize TransferQueue protocol code under slime/utils/transfer_queue.py.

  7. Make rollout write data to TransferQueue before actor/critic consumption.

  8. Make actor/critic read from TransferQueue in TransferQueue mode.

  9. Support future write-back of derived fields into the same partition.

  10. Add diagnostics for empty metadata, missing fields, wait time, and partition lifecycle.

Non-Goals

This RFC does not propose to:

  • replace the current driver loop with a full service controller.
  • introduce Ray Serve based actor/reference/advantage services in the first phase.
  • implement TransferQueue StreamingDataLoader in the first phase.
  • change model weight update logic.
  • change rollout engine management.
  • make TransferQueue mandatory.
  • support --debug-train-only plus TransferQueue in the initial integration.
  • support dynamic global batch size plus TransferQueue in the initial integration.

Proposed Design

1. Feature Flag

Add a CLI flag:

--use-transfer-queue

Behavior:

  • disabled: keep current Ray ObjectRef flow.
  • enabled: rollout writes train data to TransferQueue, and actor/critic fetch by rollout_id.

Related options:

--num-data-storage-units
--max-staleness
--polling-mode / --no-polling-mode
--transfer-queue-staleness-poll-interval
--transfer-queue-extra-data-fields

2. Initialization Lifecycle

Driver:

pgs = create_placement_groups(args)
initialize_transfer_queue(args)
rollout_manager, _ = create_rollout_manager(args, pgs["rollout"])
actor_model, critic_model = create_training_models(args, pgs, rollout_manager)

RolloutManager and train actors:

self.transfer_queue_client = connect_transfer_queue(args)

The driver initializes the TransferQueue controller/storage and stores the resulting config on args.tq_config. Ray actors connect to the existing TransferQueue using the same config.

3. Partition Protocol

Each rollout step maps to one partition:

train_{rollout_id}

Examples:

rollout_id partition_id
0 train_0
1 train_1
128 train_128

Partition lifecycle:

  1. rollout writes initial training fields.
  2. actor/critic/ref/advantage consumers read required fields.
  3. optional derived fields are written back.
  4. actor final training consumes final fields.
  5. actor clears the partition after successful training.

For critic-only warmup, the driver must clear the partition because actor does not train.

4. Task Names

Use task names to isolate consumer cursors:

Task name Owner Purpose
actor_train actor final policy training data consumption
critic_train critic PPO critic training data consumption
actor_log_probs actor or actor-fwd stage compute and write log_probs
ref_log_probs reference stage compute and write ref_log_probs
compute_advantages_and_returns advantage stage compute and write advantages and returns

The first implementation may only use actor_train and critic_train, but the names above define the forward-compatible protocol.

5. Data Fields

Base rollout fields:

tokens
total_lengths
response_lengths
loss_masks
rewards
raw_reward
truncated
sample_indices

Optional rollout fields:

rollout_log_probs
rollout_routed_experts
multimodal_train_inputs
teacher_log_probs
metadata
custom fields from --transfer-queue-extra-data-fields

Derived fields for future write-back:

values
log_probs
ref_log_probs
advantages
returns
kl
opd_reverse_kl

Field rules:

  • Consumer requested fields must be explicit.
  • Producer must either write required fields or normalize missing base fields before writing.
  • Missing optional fields must not be requested unless the corresponding feature is enabled.
  • Custom fields must be listed in --transfer-queue-extra-data-fields.

6. Rollout Write Path

When TransferQueue is disabled:

return self._split_train_data_by_dp(data, dp_size)

When TransferQueue is enabled:

transfer_rollout_data(args, client, rollout_id, data)
return None

Before writing:

  • add total_lengths.
  • validate required fields.
  • fill stable defaults for raw_reward, truncated, and sample_indices when safe.
  • convert to TensorDict.
  • write to partition train_{rollout_id}.
  • write total_lengths into custom metadata for sequence length balanced sampling.

7. Train Read Path

When TransferQueue is disabled:

rollout_data = self._get_rollout_data(rollout_data_ref)

When TransferQueue is enabled:

rollout_data = self._get_rollout_data_from_transfer_queue(rollout_id)

Only the model-parallel source rank should fetch from TransferQueue:

TP rank 0 && PP rank 0 && CP rank 0

Fetched data is then broadcast to the model-parallel group:

  1. CP group
  2. TP group
  3. PP group

This avoids multiple ranks consuming the same task cursor independently.

8. Data Parallel Sampling

The TransferQueue batch size per consumer is:

rollout_batch_size * n_samples_per_prompt / data_parallel_size

The data parallel size must match Megatron DP:

actor_world_size / (tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size)

For SeqlenBalancedSampler, the same DP size must be used during TransferQueue initialization.

9. Staleness and Backpressure

--max-staleness controls how many unconsumed train partitions may remain before rollout waits.

Strict mode:

--max-staleness 0

This means rollout cannot write a new train partition while an old one is still present.

Relaxed mode:

--max-staleness N

This allows rollout to lead training by up to N partitions.

10. Partition Cleanup

Normal actor step:

if role == "actor" and rank == 0:
    clear_partition(args, client, rollout_id)

Critic-only warmup:

ray.get(rollout_manager.clear_transfer_queue_partition.remote(rollout_id))

Cleanup must only happen after all required consumers for the step have finished.

Compatibility

Default Behavior

TransferQueue is disabled by default. Existing users should see no behavior change.

External Dependencies

The external packages transfer_queue and tensordict are imported lazily. They are only required when --use-transfer-queue is enabled.

Unsupported Initial Combinations

Initial integration should reject:

--use-transfer-queue + --debug-train-only
--use-transfer-queue + --load-debug-rollout-data
--use-transfer-queue + --use-dynamic-global-batch-size

These can be revisited later with explicit producer and metadata support.

Observability

The integration should log or expose metrics for:

  • TransferQueue initialization config.
  • partition id.
  • task name.
  • requested data fields.
  • batch_meta size.
  • get_meta wait time.
  • get_data latency.
  • put latency.
  • clear latency.
  • partition list size.
  • staleness wait time.

When actor waits for data, logs should include:

rollout_id
partition_id
task_name
data_fields
dp_rank
batch_size
elapsed_wait_time

This is critical because missing fields and empty metadata otherwise look like a training hang.

Testing Plan

Unit Tests

Cover:

  • default_train_data_fields.
  • required and optional field selection.
  • train data normalization before TransferQueue write.
  • DP size calculation with TP/PP/CP.
  • dict_to_tensordict for 1D tensors, jagged tensors, routed experts, and non-tensor fields.
  • tensordict_to_rollout_data.
  • staleness wait behavior with mocked client.

Integration Tests

Cover:

  1. GRPO single step with TransferQueue.
  2. PPO single step with critic.
  3. critic-only warmup cleanup.
  4. max_staleness=0.
  5. max_staleness>0.
  6. balance_data=True.
  7. rollout logprobs.
  8. routing replay.
  9. multimodal train inputs.
  10. custom extra fields.

Plan

Phase 1: Optional Rollout-to-Train Transfer

  • initialize and connect TransferQueue.
  • rollout writes full train batch.
  • actor/critic read from TransferQueue.
  • actor clears partition.
  • keep driver control flow unchanged.

Phase 2: Derived Field Write-Back

  • critic writes values.
  • actor-fwd writes log_probs.
  • reference writes ref_log_probs.
  • advantage stage writes advantages and returns.
  • actor final train consumes derived fields from TransferQueue.

Phase 3: Streaming and Fully Async

  • introduce streaming dataset or dataloader.
  • support chunk-level rollout writes.
  • split actor-fwd/reference/advantage into independent services.
  • add production/consumption status and recovery protocol.

What I've Tried

nothing

Environment (if relevant)

Additional Context

No response

Pre-submission Checklist

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions