Skip to content

Streaming training (Phase P3)

ramjetio ships two ways to accelerate a training job with the distributed cache:

Mode Entry point What it caches Best for
Pre-export (P2) dataset.export_local(...) Whole dataset copied to local disk once Fits-on-disk datasets, repeated runs
Streaming (P3) ramjetio.integrations.ultralytics.streaming_train(...) Raw image bytes fetched per-step Larger-than-disk datasets, fast first-step

Both share the same distributed cache server, ring, and authentication layers. You pick the mode per training job.

When to use which

                       ┌─ dataset fits on local disk ? ─┐
                       │                                 │
                     YES                                NO
                       │                                 │
        ┌─ running > 2 epochs ? ─┐              use streaming_train()
        │                        │
       YES                      NO
        │                        │
  use export_local()        either works;
  (warm epochs are         streaming is faster
   ~3× faster)             to first step

Rules of thumb:

  • Dataset > local disk → streaming is the only option.
  • < 100 samples/GPU/sec compute workload (big models, small batches) → streaming is fine; I/O is not the bottleneck.
  • YOLO + mosaic aug + 5k-sample dataset → export_local wins on repeated warm runs; streaming is ~2–3× slower per-epoch but eliminates the 5-min cold export.
  • Hyperparameter sweeps on the same dataset → export_local, you pay export once and amortize across runs.

Streaming usage

import ramjetio
from ultralytics import YOLO
from ramjetio.datasets import UniversalDataset
from ramjetio.integrations.ultralytics import streaming_train

ramjetio.init()  # ← starts the per-host cache server, joins the ring

# Minimal bootstrap on local disk: real labels + 32×32 JPEG placeholders.
# ~30 s for 5k samples vs ~5 min for a full export_local.
dataset = UniversalDataset(split="train", source="s3://my-bucket/my-dataset")
dataset.export_labels_only("/tmp/ramjet_data", rank=int(os.environ.get("RANK", 0)),
                           world_size=int(os.environ.get("WORLD_SIZE", 1)))

model = YOLO("yolov8n.pt")
streaming_train(
    model,
    data="/tmp/ramjet_data/data.yaml",
    source=dataset._client,              # or any object with get_bytes(path)
    samples=list(dataset.samples),        # S3 keys, one per sample index
    epochs=1,
    batch=8,
    workers=4,                            # fork-safe
    val=False, plots=False,               # extra belt-and-braces; SDK also suppresses
)

What streaming_train wires up for you:

  • Frozen consistent-hash ring on on_train_start, unfreezes on on_train_end — keeps key→node assignment stable across the whole training run, so cached items don't get re-shuffled mid-epoch.
  • Val suppression: stubs trainer.validate / final_eval so the dataloader doesn't run a 200 s val pass over the 32×32 placeholder images (meaningless metrics anyway).
  • DDP with find_unused_parameters=True: required for YOLOv8 heads with workers>0, otherwise rank 1 fails on the first backward pass.
  • Source attribution: every cache hit is recorded as cache_local, cache_peer, or s3 and surfaced via /stats and backend heartbeats — visible per-cluster on the dashboard.

Environment variables

Variable Default Purpose
RAMJET_API_KEY required Authenticates all peer traffic.
RAMJET_HTTP_POOL_SIZE max(32, nodes×8) Per-process HTTP connection pool. Raise for workers>8 or nodes>8.
RAMJET_SINGLEFLIGHT 1 Server-side dedup of concurrent misses on the same key — only one node fetches from origin, the rest wait for the result.
RAMJET_SINGLEFLIGHT_TIMEOUT_MS 30000 Max wait for a leader fetch before falling through.
RAMJET_ULTRALYTICS_STRICT 0 When set, an out-of-range ultralytics version raises ImportError instead of warning.

Expected numbers (2-node A5000, 5315-sample YOLO dataset, batch=8)

Scenario Wall time it/s Notes
P2 export_local warm 46.8 s n/a 100% local+peer cache
P3 streaming_train warm, workers=0 179 s 2.6 val-suppressed
P3 streaming_train warm, workers=4 137 s 3.8 current best
P3 streaming_train cold (first run) ~1337 s 0.25 S3-bound over cloudflare tunnel
Export-labels-only bootstrap 32 s n/a creates placeholders

Headline: time-to-first-training-step is ~32 s vs ~290 s for P2 on a cold cluster — ×9 faster first step. Steady-state throughput with a fully warm cache is ~3× slower than P2 warm on fits-on-disk datasets; that is the honest tradeoff for eliminating the export stage.

Troubleshooting

  • XMinioInvalidObjectName — you called streaming_train without the samples= kwarg and the trainer tried to look up the local placeholder path in S3. Pass samples=list(dataset.samples).
  • Parameter indices which did not receive grad for rank N: ... — your model has parameters that don't receive gradients on every step. Wrap with DDP(model, find_unused_parameters=True).
  • Training hangs after epoch end — the val loop is running against placeholder images. Pass val=False and rely on the SDK's val suppression.