This tutorial is available as a Jupyter notebook.

Open in Colab

Image Data: CIFAR10#

In this tutorial, we will demonstrate how to use the streaming CIFAR-10 dataset to train a classification model.

Tutorial Goals and Concepts Covered#

The goal of this tutorial is to showcase how to prepare the dataset and use Streaming data loading to train the model. It will consist of a few steps:

  1. Obtaining the dataset

  2. Preparing the dataset for streaming

  3. Streaming the dataset to the local machine

  4. Training a model using these datasets

Let’s get started!


Let’s start by making sure the right packages are installed and imported. We need to install the mosaicml-streaming package which installs the sufficient dependencies to run this tutorial.

[ ]:
%pip install mosaicml-streaming
# To install from source instead of the last release, comment the command above and uncomment the following one.
# %pip install git+https://github.com/mosaicml/streaming.git

# (Optional) To upload a streaming dataset to an AWS S3 bucket
%pip install awscli
[ ]:
import time
import os
import shutil
from typing import Callable, Any, Tuple

import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from torchvision.datasets import CIFAR10

We’ll be using Streaming’s MDSWriter which writes the dataset in Streaming format and StreamingDataset class to load the streaming dataset.

[ ]:
from streaming import MDSWriter, StreamingDataset

Global settings#

For this tutorial, it makes the most sense to organize our global settings here rather than distribute them throughout the cells in which they’re used.

[ ]:
# the location of our dataset
in_root = "./dataset"

# the location of the "remote" streaming dataset (`sds`).
# Upload `out_root` to your cloud storage provider of choice.
out_root = "./sds"
out_train = "./sds/train"
out_test = "./sds/test"

# the location to download the streaming dataset during training
local = './local'
local_train = './local/train'
local_test = './local/test'

# toggle shuffling in dataloader
shuffle_train = True
shuffle_test = False

# shard size limit, in bytes
size_limit = 1 << 25

# training batch size
batch_size = 32

# training hardware parameters
device = "cuda" if torch.cuda.is_available() else "cpu"

# number of training epochs
train_epochs = 2 # increase the number of epochs for greater accuracy

# Hashing algorithm to use for dataset
hashes = ['sha1' ,'xxh64']
[ ]:
# upload location for the dataset splits (change this if you want to upload to a different location, for example, AWS S3 bucket location)
upload_location = None

if upload_location is None:
    upload_train_location = None
    upload_test_location = None
    upload_train_location = os.path.join(upload_location, 'train')
    upload_test_location = os.path.join(upload_location, 'test')

Download the CIFAR10 raw dataset#

[ ]:
# Download the CIFAR10 raw dataset using torchvision
train_raw_dataset = CIFAR10(root=in_root, train=True, download=True)
test_raw_dataset = CIFAR10(root=in_root, train=False, download=True)

Next, we’ll make the directories for our binary streaming dataset files.

Preparing and writing the dataset#

Below, we’ll set up the logic for writing our starting dataset to files that can be read using a streaming dataloader.

For more information on the MDSWriter check out the API reference.

[ ]:
def write_datasets(dataset: Dataset, split_dir: str) -> None:
    fields = {
        'x': 'pil',
        'y': 'int',
    indices = np.random.permutation(len(dataset))
    indices = tqdm(indices)
    with MDSWriter(out=split_dir, columns=fields, hashes=hashes, size_limit=size_limit) as out:
        for i in indices:
            x, y = dataset[i]
                'x': x,
                'y': y,

Now that we’ve written the datasets to out_root, one can upload them to a cloud storage provider, and we are ready to stream them.

[ ]:
remote_train = upload_train_location or out_train # replace this with your URL for cloud streaming
remote_test  = upload_test_location or out_test

Loading the Data#

We extend StreamingDataset to deserialize the data.

For more information on the StreamingDataset class check out the API reference.

[ ]:
class CIFAR10Dataset(StreamingDataset):
    def __init__(self,
                 remote: str,
                 local: str,
                 shuffle: bool,
                 batch_size: int,
                 transforms: Callable
                ) -> None:
        super().__init__(local=local, remote=remote, shuffle=shuffle, batch_size=batch_size)
        self.transforms = transforms

    def __getitem__(self, idx:int) -> Any:
        obj = super().__getitem__(idx)
        x = obj['x']
        y = obj['y']
        return self.transforms(x), y

Initialize the data transformation#

[ ]:
transformation = transforms.Compose([
       (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)

Putting It All Together#

We’re now ready to actually write the streamable dataset. Let’s do that if we haven’t already.

[ ]:
if not os.path.exists(out_train):
    write_datasets(train_raw_dataset, out_train)
    write_datasets(test_raw_dataset, out_test)

(Optional) Upload the Streaming dataset to an AWS S3 bucket of your choice. Uncomment the below line if you have provided the S3 bucket link to upload_location.

[ ]:
# !aws s3 cp $out_root $upload_location --recursive

Once that’s done, we can instantiate our streaming datasets and wrap them in standard dataloaders for training!

[ ]:
train_dataset = CIFAR10Dataset(remote_train, local_train, shuffle_train, batch_size=batch_size, transforms=transformation)
test_dataset  = CIFAR10Dataset(remote_test, local_test, shuffle_test, batch_size=batch_size, transforms=transformation)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

Create a model#

We are going to create a Convolutional Neural Network (CNN) classification model in this tutorial. We will be using the CrossEntropyLoss to calculate the loss value and SGD Stochastic Gradient Descent method as the optimizer.

[ ]:
class Net(nn.Module):
    def __init__(self):
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = Net()
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)

Define a model train function#

[ ]:
def fit(model: nn.Module, train_dataloader: DataLoader) -> Tuple[float, float]:
    train_running_loss = 0.0
    train_running_correct = 0
    with tqdm(train_dataloader, unit="batch") as tepoch:
        for imgs, labels in tepoch:
            imgs = imgs.to(device)
            labels = labels.to(device)
            labels_hat = model(imgs)
            loss = criterion(labels_hat, labels)
            train_running_loss += loss.item()
            _, preds = torch.max(labels_hat.data, 1)
            train_running_correct += (preds == labels).sum().item()

    train_loss = train_running_loss/len(train_dataloader.dataset)
    train_accuracy = 100. * train_running_correct/len(train_dataloader.dataset)

    return train_loss, train_accuracy

Define a model evaluation function#

[ ]:
def eval(model: nn.Module, test_dataloader: DataLoader) -> Tuple[float, float]:
    val_running_loss = 0.0
    val_running_correct = 0
    with tqdm(test_dataloader, unit="batch") as tepoch:
        for imgs, labels in tepoch:
            imgs = imgs.to(device)
            labels = labels.to(device)
            output = model(imgs)
            loss = criterion(output, labels)
            val_running_loss += loss.item()
            _, preds = torch.max(output.data, 1)
            val_running_correct += (preds == labels).sum().item()

    val_loss = val_running_loss/len(test_dataloader.dataset)
    val_accuracy = 100. * val_running_correct/len(test_dataloader.dataset)

    return val_loss, val_accuracy

Train with the Streaming Dataloaders#

Now all that’s left to do is train!

[ ]:
for epoch in range(train_epochs):
    train_epoch_loss, train_epoch_accuracy = fit(model, train_dataloader)
    print(f'epoch: {epoch+1}/{train_epochs} Train Loss: {train_epoch_loss:.4f}, Train Acc: {train_epoch_accuracy:.2f}')
    val_epoch_loss, val_epoch_accuracy = eval(model, test_dataloader)
    print(f'epoch: {epoch+1}/{train_epochs} Val Loss: {val_epoch_loss:.4f}, Val Acc: {val_epoch_accuracy:.2f}')


That’s it. No need to hang on to the files created by the tutorial…

[ ]:
shutil.rmtree(out_root, ignore_errors=True)
shutil.rmtree(in_root, ignore_errors=True)
shutil.rmtree(local, ignore_errors=True)

What next?#

You’ve now seen an in-depth look at how to prepare and use streaming datasets with PyTorch.

To continue learning about Streaming, please continue to explore our examples!

Come get involved with MosaicML!#

We’d love for you to get involved with the MosaicML community in any of these ways:

Star Streaming on GitHub#

Help make others aware of our work by starring Streaming on GitHub.

Join the MosaicML Slack#

Head on over to the MosaicML slack to join other ML efficiency enthusiasts. Come for the paper discussions, stay for the memes!

Contribute to Streaming#

Is there a bug you noticed or a feature you’d like? File an issue or make a pull request!