Basic Dataset Conversion#
This guide covers how to convert your raw data to MDS format using streaming.MDSWriter
. Writing to other supported shard formats is very similar. Read more about dataset shard formats in the Dataset Format guide. For a high-level explanation of how dataset writing works, check out the main concepts page.
Configuring dataset writing#
Use streaming.MDSWriter
to convert raw data to MDS format. MDSWriter is like a native file writer; instead of writing the content line by line, MDSWriter writes the data sample by sample. It writes the data into shard files in a sequential manner (for example, shard.00000.mds
, then shard.00001.mds
, and so on). Configure streaming.MDSWriter
according to your requirements with the parameters below:
An
out
parameter is an output directory to save shard files. Theout
directory can be specified in three ways:
Local path: Shard files are stored locally.
Remote path: A local temporary directory is created to cache the shard files, and when shard creation is complete, they are uploaded to the remote location.
(local_dir, remote_dir)
tuple: Shard files are saved in the specifiedlocal_dir
and uploaded toremote_dir
.
out = '/local/data'
out = 's3://bucket/data' # Will create a temporary local dir
out = ('/local/data', 'oci://bucket/data')
The optional
keep_local
parameter controls if you would like to keep the shard files locally after they have been uploaded to a remote cloud location. To save local disk space, this defaults toFalse
.A
column
parameter is adict
mapping a feature name or label name with a streaming supported encoding type.MDSWriter
encodes your data to bytes, and at training time, data gets decoded back automatically to its original form. Theindex.json
file storescolumn
metadata for decoding. Supported encoding formats are:
Category |
Name |
Class |
Notes |
---|---|---|---|
Encoding |
‘bytes’ |
|
no-op encoding |
Encoding |
‘str’ |
|
stores in UTF-8 |
Encoding |
‘int’ |
|
Python |
Numpy Array |
‘ndarray:dtype:shape’ |
|
uses |
Numpy Unsigned Int |
‘uint8’ |
|
uses |
Numpy Unsigned Int |
‘uint16’ |
|
uses |
Numpy Unsigned Int |
‘uint32’ |
|
uses |
Numpy Unsigned Int |
‘uint64’ |
|
uses |
Numpy Signed Int |
‘int8’ |
|
uses |
Numpy Signed Int |
‘int16’ |
|
uses |
Numpy Signed Int |
‘int32’ |
|
uses |
Numpy Signed Int |
‘int64’ |
|
uses |
Numpy Float |
‘float16’ |
|
uses |
Numpy Float |
‘float32’ |
|
uses |
Numpy Float |
‘float64’ |
|
uses |
Numerical String |
‘str_int’ |
|
stores in UTF-8 |
Numerical String |
‘str_float’ |
|
stores in UTF-8 |
Numerical String |
‘str_decimal’ |
|
stores in UTF-8 |
Image |
‘pil’ |
|
raw PIL image class (link) |
Image |
‘jpeg’ |
|
PIL image as JPEG |
Image |
‘png’ |
|
PIL image as PNG |
Pickle |
‘pkl’ |
|
arbitrary Python objects |
JSON |
‘json’ |
|
arbitrary data as JSON |
Here’s an example where the field x
is an image, and y
is a class label, as an integer.
column = {
'x': 'jpeg',
'y': 'int8',
}
If the data type you need is not listed in the above table, then you can write your own data type class with encode
and decode
methods in it and patch it inside streaming. For example, let’s say, you wanted to add a complex128
data type (64 bits each for real and imaginary parts):
import numpy as np
from typing import Any
from streaming.base.format.mds.encodings import Encoding, _encodings
class Complex128(Encoding):
def encode(self, obj: Any) -> bytes:
return np.complex128(obj).tobytes()
def decode(self, data: bytes) -> Any:
return np.frombuffer(data, np.complex128)[0]
_encodings['complex128'] = Complex128
An optional shard
size_limit
, in bytes, for each uncompressed shard file. This defaults to 67 MB. Specify this as a number of bytes, either directly as anint
, or a human-readable suffix:
size_limit = 1024 # 1kB limit, as an int
size_limit = '1kb' # 1kB limit, as a human-readable string
Shard file size depends on the dataset size, but generally, too small of a shard size creates a ton of shard files and heavy network overheads, and too large of a shard size creates fewer shard files, but downloads are less balanced. A shard size of between 50-100MB works well in practice.
An optional
compression
algorithm name (and level) if you would like to compress the shard files. This can reduce egress costs during training. StreamingDataset will uncompress shard files upon download during training. You can control whether to keep compressed shard files locally during training with thekeep_zip
flag – more information here.
Supported compression algorithms:
Name |
Code |
Min Level |
Default Level |
Max Level |
---|---|---|---|---|
br |
0 |
11 |
11 |
|
bz2 |
1 |
9 |
9 |
|
gz |
0 |
9 |
9 |
|
snappy |
– |
– |
– |
|
zstd |
1 |
3 |
22 |
The compression algorithm to use, if any, is specified by passing code
or code:level
as a string. For example:
compression = 'zstd' # zstd, defaults to level 3.
compression = 'zstd:9' # zstd, specifying level 9.
The higher the level, the higher the compression ratio. However, using higher compression levels will impact the compression speed. In our experience, zstd
is optimal over the time-size Pareto frontier. Compression is most beneficial for text, whereas it is less helpful for other modalities like images.
An optional
hashes
list of algorithm names, used to verify data integrity. Hashes are saved in theindex.json
file. Hash verification during training is controlled with thevalidate_hash
argument more information here.
Available cryptographic hash functions:
Hash |
Digest Bytes |
---|---|
‘blake2b’ |
64 |
‘blake2s’ |
32 |
‘md5’ |
16 |
‘sha1’ |
20 |
‘sha224’ |
28 |
‘sha256’ |
32 |
‘sha384’ |
48 |
‘sha512’ |
64 |
‘sha3_224’ |
28 |
‘sha3_256’ |
32 |
‘sha3_384’ |
48 |
‘sha3_512’ |
64 |
Available non-cryptographic hash functions:
Hash |
Digest Bytes |
---|---|
‘xxh32’ |
4 |
‘xxh64’ |
8 |
‘xxh128’ |
16 |
‘xxh3_64’ |
8 |
‘xxh3_128’ |
16 |
As an example:
hashes = ['sha256', 'xxh64']
Example: Writing a dataset to MDS format#
Let’s put it all together with an example. Here, we create a synthetic classification dataset that returns a tuple of features and a label.
import numpy as np
class RandomClassificationDataset:
"""Classification dataset drawn from a normal distribution.
Args:
shape: data sample dimensions (default: (10,))
size: number of samples (default: 10000)
num_classes: number of classes (default: 2)
"""
def __init__(self, shape=(10,), size=10000, num_classes=2):
self.size = size
self.x = np.random.randn(size, *shape)
self.y = np.random.randint(0, num_classes, size)
def __len__(self):
return self.size
def __getitem__(self, index: int):
return self.x[index], self.y[index]
Here, we write shards to a local directory. You can specify a remote path as well.
output_dir = 'test_output_dir'
Specify the column encoding types for each sample and label:
columns = {'x': 'pkl', 'y': 'int64'}
Optionally, specify a compression algorithm and level:
compression = 'zstd:7' # compress shards with ZStandard, level 7
Optionally, specify a list of hash algorithms for verification:
hashes = ['sha1'] # Use only SHA1 hashing on each shard
Optionally, provide a shard size limit, after which a new shard starts. In this small example, we use 10kb, but for production datasets 50-100MB is more appropriate.
# Here we use a human-readable string, but we could also
# pass in an int specifying the number of bytes.
limit = '10kb'
It’s time to call the streaming.MDSWriter
with the above initialized parameters and write the samples by iterating over a dataset.
from streaming.base import MDSWriter
dataset = RandomClassificationDataset()
with MDSWriter(out=output_dir, columns=columns, compression=compression, hashes=hashes, size_limit=limit) as out:
for x, y in dataset:
out.write({'x': x, 'y': y})
Clean up after ourselves.
from shutil import rmtree
rmtree(output_dir)
Once the dataset has been written, the output directory contains an index.json file that contains shard metadata, the shard files themselves. For example,
dirname
├── index.json
├── shard.00000.mds.zstd
└── shard.00001.mds.zstd
Example: Writing ndarray
s to MDS format#
Here, we show how to write ndarray
s to MDS format in three ways:
dynamic shape and dtype
dynamic shape but fixed dtype
fixed shape and dtype
Serializing ndarrays with fixed dtype and shape is more efficient than fixed dtype and dynamic shape, which is in turn more efficient than dynamic dtype and shape.
Dynamic shape, dynamic dtype#
The streaming encoding type, as the value in the columns
dict, should simply be ndarray
.
import numpy as np
from streaming.base import MDSWriter, StreamingDataset
# Write to MDS
with MDSWriter(out='my_dataset1/',
columns={'my_array': 'ndarray'}) as out:
for i in range(42):
# Dimension can change
ndim = np.random.randint(1, 5)
shape = np.random.randint(1, 5, ndim)
shape = tuple(shape.tolist())
my_array = np.random.normal(0, 1, shape)
out.write({'my_array': my_array})
# Inspect dataset
dataset = StreamingDataset(local='my_dataset1/', batch_size=1)
for i in range(dataset.num_samples):
print(dataset[i])
Dynamic shape, fixed dtype#
The streaming encoding type, as the value in the columns
dict, should be ndarray:dtype
. So in this example, it is ndarray:int16
.
# Write to MDS
with MDSWriter(out='my_dataset2/',
columns={'my_array': 'ndarray:int16'}) as out:
for i in range(42):
# Dimension can change
ndim = np.random.randint(1, 5)
shape = np.random.randint(1, 5, ndim)
shape = tuple(shape.tolist())
# Datatype is fixed
my_array = np.random.normal(0, 100, shape).astype(np.int16)
out.write({'my_array': my_array})
# Inspect dataset
dataset = StreamingDataset(local='my_dataset2/', batch_size=1)
for i in range(dataset.num_samples):
print(dataset[i])
Fixed shape, fixed dtype#
The streaming encoding type, as the value in the columns
dict, should be ndarray:dtype:shape
. So in this example, it is ndarray:int16:3,3,3
.
# Write to MDS
with MDSWriter(out='my_dataset3/',
columns={'my_array': 'ndarray:int16:3,3,3'}) as out:
for i in range(42):
# Shape is fixed
shape = 3, 3, 3
# Datatype is fixed
my_array = np.random.normal(0, 100, shape).astype(np.int16)
out.write({'my_array': my_array})
# Inspect dataset
dataset = StreamingDataset(local='my_dataset3/', batch_size=1)
for i in range(dataset.num_samples):
print(dataset[i])
We can see that the dataset is more efficiently serialized when we are more specific about array shape and datatype:
import subprocess
# Dynamic shape, dynamic dtype uses most space
subprocess.run(['du', '-sh', 'my_dataset1'])
# Dynamic shape, fixed dtype uses less space
subprocess.run(['du', '-sh', 'my_dataset2'])
# Fixed shape, fixed dtype uses the least space
subprocess.run(['du', '-sh', 'my_dataset3'])
Clean up after ourselves.
from shutil import rmtree
rmtree('my_dataset1')
rmtree('my_dataset2')
rmtree('my_dataset3')