Streaming training (Phase P3)¶
ramjetio ships two ways to accelerate a training job with the distributed
cache:
| Mode | Entry point | What it caches | Best for |
|---|---|---|---|
| Pre-export (P2) | dataset.export_local(...) |
Whole dataset copied to local disk once | Fits-on-disk datasets, repeated runs |
| Streaming (P3) | ramjetio.integrations.ultralytics.streaming_train(...) |
Raw image bytes fetched per-step | Larger-than-disk datasets, fast first-step |
Both share the same distributed cache server, ring, and authentication layers. You pick the mode per training job.
When to use which¶
┌─ dataset fits on local disk ? ─┐
│ │
YES NO
│ │
┌─ running > 2 epochs ? ─┐ use streaming_train()
│ │
YES NO
│ │
use export_local() either works;
(warm epochs are streaming is faster
~3× faster) to first step
Rules of thumb:
- Dataset > local disk → streaming is the only option.
- < 100 samples/GPU/sec compute workload (big models, small batches) → streaming is fine; I/O is not the bottleneck.
- YOLO + mosaic aug + 5k-sample dataset → export_local wins on repeated warm runs; streaming is ~2–3× slower per-epoch but eliminates the 5-min cold export.
- Hyperparameter sweeps on the same dataset → export_local, you pay export once and amortize across runs.
Streaming usage¶
import ramjetio
from ultralytics import YOLO
from ramjetio.datasets import UniversalDataset
from ramjetio.integrations.ultralytics import streaming_train
ramjetio.init() # ← starts the per-host cache server, joins the ring
# Minimal bootstrap on local disk: real labels + 32×32 JPEG placeholders.
# ~30 s for 5k samples vs ~5 min for a full export_local.
dataset = UniversalDataset(split="train", source="s3://my-bucket/my-dataset")
dataset.export_labels_only("/tmp/ramjet_data", rank=int(os.environ.get("RANK", 0)),
world_size=int(os.environ.get("WORLD_SIZE", 1)))
model = YOLO("yolov8n.pt")
streaming_train(
model,
data="/tmp/ramjet_data/data.yaml",
source=dataset._client, # or any object with get_bytes(path)
samples=list(dataset.samples), # S3 keys, one per sample index
epochs=1,
batch=8,
workers=4, # fork-safe
val=False, plots=False, # extra belt-and-braces; SDK also suppresses
)
What streaming_train wires up for you:
- Frozen consistent-hash ring on
on_train_start, unfreezes onon_train_end— keeps key→node assignment stable across the whole training run, so cached items don't get re-shuffled mid-epoch. - Val suppression: stubs
trainer.validate/final_evalso the dataloader doesn't run a 200 s val pass over the 32×32 placeholder images (meaningless metrics anyway). - DDP with
find_unused_parameters=True: required for YOLOv8 heads withworkers>0, otherwise rank 1 fails on the first backward pass. - Source attribution: every cache hit is recorded as
cache_local,cache_peer, ors3and surfaced via/statsand backend heartbeats — visible per-cluster on the dashboard.
Environment variables¶
| Variable | Default | Purpose |
|---|---|---|
RAMJET_API_KEY |
required | Authenticates all peer traffic. |
RAMJET_HTTP_POOL_SIZE |
max(32, nodes×8) |
Per-process HTTP connection pool. Raise for workers>8 or nodes>8. |
RAMJET_SINGLEFLIGHT |
1 |
Server-side dedup of concurrent misses on the same key — only one node fetches from origin, the rest wait for the result. |
RAMJET_SINGLEFLIGHT_TIMEOUT_MS |
30000 |
Max wait for a leader fetch before falling through. |
RAMJET_ULTRALYTICS_STRICT |
0 |
When set, an out-of-range ultralytics version raises ImportError instead of warning. |
Expected numbers (2-node A5000, 5315-sample YOLO dataset, batch=8)¶
| Scenario | Wall time | it/s | Notes |
|---|---|---|---|
P2 export_local warm |
46.8 s | n/a | 100% local+peer cache |
P3 streaming_train warm, workers=0 |
179 s | 2.6 | val-suppressed |
P3 streaming_train warm, workers=4 |
137 s | 3.8 | current best |
P3 streaming_train cold (first run) |
~1337 s | 0.25 | S3-bound over cloudflare tunnel |
| Export-labels-only bootstrap | 32 s | n/a | creates placeholders |
Headline: time-to-first-training-step is ~32 s vs ~290 s for P2 on a cold cluster — ×9 faster first step. Steady-state throughput with a fully warm cache is ~3× slower than P2 warm on fits-on-disk datasets; that is the honest tradeoff for eliminating the export stage.
Troubleshooting¶
XMinioInvalidObjectName— you calledstreaming_trainwithout thesamples=kwarg and the trainer tried to look up the local placeholder path in S3. Passsamples=list(dataset.samples).Parameter indices which did not receive grad for rank N: ...— your model has parameters that don't receive gradients on every step. Wrap withDDP(model, find_unused_parameters=True).- Training hangs after epoch end — the val loop is running against
placeholder images. Pass
val=Falseand rely on the SDK's val suppression.