class streaming.Stream(*, remote=None, local=None, split=None, proportion=None, repeat=None, choose=None, download_retry=None, download_timeout=None, validate_hash=None, keep_zip=None)[source]#

A dataset, or sub-dataset if mixing, from which we stream/cache samples.

We initialize a StreamingDataset with one or more Streams. Streams may be resampled to achieve different mixtures of samples.

Stream init takes three kinds of arguments:

  • At least one of remote and local must exist. If no remote, the data must be local. If no local, we cache to a temp directory.

    • remote

    • local

  • At most one of proportion, repeat, or choose may exist. If provided one of these, we derive the rest. Note that proportion (relative) and repeat/choose (absolute) are mutually incompatible – you must entirely use one or the other (or neither) for all sub-datasets. If none are provided for all streams and epoch_size is unspecified, then each sample from each stream is seen once per epoch. If none are provided for all streams and epoch_size is specified, then streams are sampled in proportion to their size.

    • proportion

    • repeat

    • choose

  • The remaining arguments are optional knobs for controlling downloading behavior and default to None. If None, they take a default value provided to or by the StreamingDataset init.

    • split

    • download_retry

    • download_timeout

    • validate_hash

    • keep_zip

  • remote (str, optional) – Remote path or directory to download the dataset from. If None, its data must exist locally. Defaults to None.

  • local (str, optional) – Local working directory to download shards to. This is where shards are cached while they are being used. Uses a temp directory if not set. Defaults to None.

  • split (str, optional) – Which dataset split to use, if any. If provided, we stream from/to the split subdirs of remote and local. Defaults to None.

  • proportion (float, optional) – How much to upsample or downsample this sub-dataset, as the proportion of the total combined dataset that consists of this sub-dataset. If using proportions, all sub-datasets provided together to the StreamingDataset init must define their proportions. The total combined number of samples is either the StreamingDataset argument β€œepoch_size” if provided, or kept the same total size as the underlying data if not. If provided, must be non-negative. Defaults to None.

  • repeat (float, optional) – How much to upsample or downsample this sub-dataset, as a multipler on the number of samples. If provided, must be non-negative. Defaults to None.

  • choose (int, optional) – How much to upsample or downsample this sub-dataset, as the exact number of resulting samples. If provided, must be non-negative. Defaults to None.

  • download_retry (int, optional) – Number of download re-attempts before giving up. Defaults to None.

  • download_timeout (float, optional) – Number of seconds to wait for a shard to download before raising an exception. Defaults to None.

  • validate_hash (str, optional) – Optional hash or checksum algorithm to use to validate shards. Defaults to None.

  • keep_zip (bool, optional) – Whether to keep or delete the compressed form when decompressing downloaded shards. If False, keep if and only if remote is local or no remote. Defaults to None.


Apply defaults, setting any unset fields.

We use pairs of (name, _name) in order to make type checking happy.


default (Self) – Stream containing default values for all optional fields.

classmethod apply_weights(streams, samples_per_stream, choose_per_epoch, seed)[source]#

Given samples per stream, derive each stream’s proportion/repeat/samples.

Modifies streams to save the derived weights.

  • streams (Sequence[Stream]) – The list of streams which comprise the dataset.

  • samples_per_stream (NDArray[np.int64]) – Underlying samples of each stream.

  • choose_per_epoch (int, optional) – Absolute epoch size if weighting relatively.

  • seed (int) – Random number generator seed used to sample evenly.


int – Number of samples to draw per epoch.


Get the size of the index file in bytes.


int – Size in bytes.

get_shards(world, allow_unsafe_types)[source]#

Load this Stream’s index, retrieving its shard readers.

  • world (World) – Distributed context.

  • allow_unsafe_types (bool) – If a shard contains Pickle, which allows arbitrary code execution during deserialization, whether to keep going if True or raise an error.


`List[Reader] – Shard readers.


Ensure (download, validate, extract, etc.) that we have the given shard.


shard (Reader) – Which shard.


int – Change in cache usage.

set_up_local(shards, cache_usage_per_shard)[source]#

Bring a local directory into a consistent state, getting which shards are present.

  • shards (List[Reader]) – List of this stream’s shards.

  • cache_usage_per_shard (NDArray[np.int64]) – Cache usage per shard of this stream.

classmethod validate_weights(streams)[source]#

Validate stream weights, returning whether relative or absolute weighting was used.


streams (Sequence[Stream]) – Every stream comprising the dataset.


bool – Whether streams are weighted relatively (proportionally).