Stream#
- class streaming.Stream(*, remote=None, local=None, split=None, proportion=None, repeat=None, choose=None, download_retry=None, download_timeout=None, validate_hash=None, keep_zip=None)[source]#
A dataset, or sub-dataset if mixing, from which we stream/cache samples.
We initialize a StreamingDataset with one or more Streams. Streams may be resampled to achieve different mixtures of samples.
Stream init takes three kinds of arguments:
At least one of
remote
andlocal
must exist. If noremote
, the data must be local. If nolocal
, we cache to a temp directory.remote
local
At most one of
proportion
,repeat
, orchoose
may exist. If provided one of these, we derive the rest. Note thatproportion
(relative) andrepeat
/choose
(absolute) are mutually incompatible β you must entirely use one or the other (or neither) for all sub-datasets. If none are provided for all streams andepoch_size
is unspecified, then each sample from each stream is seen once per epoch. If none are provided for all streams andepoch_size
is specified, then streams are sampled in proportion to their size.proportion
repeat
choose
The remaining arguments are optional knobs for controlling downloading behavior and default to
None
. IfNone
, they take a default value provided to or by the StreamingDataset init.split
download_retry
download_timeout
validate_hash
keep_zip
- Parameters
remote (str, optional) β Remote path or directory to download the dataset from. If
None
, its data must exist locally. Defaults toNone
.local (str, optional) β Local working directory to download shards to. This is where shards are cached while they are being used. Uses a temp directory if not set. Defaults to
None
.split (str, optional) β Which dataset split to use, if any. If provided, we stream from/to the
split
subdirs ofremote
andlocal
. Defaults toNone
.proportion (float, optional) β How much to upsample or downsample this sub-dataset, as the proportion of the total combined dataset that consists of this sub-dataset. If using proportions, all sub-datasets provided together to the StreamingDataset init must define their proportions. The total combined number of samples is either the StreamingDataset argument βepoch_sizeβ if provided, or kept the same total size as the underlying data if not. If provided, must be non-negative. Defaults to
None
.repeat (float, optional) β How much to upsample or downsample this sub-dataset, as a multipler on the number of samples. If provided, must be non-negative. Defaults to
None
.choose (int, optional) β How much to upsample or downsample this sub-dataset, as the exact number of resulting samples. If provided, must be non-negative. Defaults to
None
.download_retry (int, optional) β Number of download re-attempts before giving up. Defaults to
None
.download_timeout (float, optional) β Number of seconds to wait for a shard to download before raising an exception. Defaults to
None
.validate_hash (str, optional) β Optional hash or checksum algorithm to use to validate shards. Defaults to
None
.keep_zip (bool, optional) β Whether to keep or delete the compressed form when decompressing downloaded shards. If
False
, keep if and only if remote is local or no remote. Defaults toNone
.
- apply_default(default)[source]#
Apply defaults, setting any unset fields.
We use pairs of (name, _name) in order to make type checking happy.
- Parameters
default (Self) β Stream containing default values for all optional fields.
- classmethod apply_weights(streams, samples_per_stream, choose_per_epoch, seed)[source]#
Given samples per stream, derive each streamβs proportion/repeat/samples.
Modifies streams to save the derived weights.
- Parameters
streams (Sequence[Stream]) β The list of streams which comprise the dataset.
samples_per_stream (NDArray[np.int64]) β Underlying samples of each stream.
choose_per_epoch (int, optional) β Absolute epoch size if weighting relatively.
seed (int) β Random number generator seed used to sample evenly.
- Returns
int β Number of samples to draw per epoch.
- get_shards(world, allow_unsafe_types)[source]#
Load this Streamβs index, retrieving its shard readers.
- prepare_shard(shard)[source]#
Ensure (download, validate, extract, etc.) that we have the given shard.
- Parameters
shard (Reader) β Which shard.
- Returns
int β Change in cache usage.