Tip

This tutorial is available as a Jupyter notebook.

Open in Colab

Multiprocess dataset conversion#

If your dataset is huge, running single process dataset conversion script could be very time consuming. You can use multiprocessing with MDSWriter to convert your dataset in parallel. There are few ways in which you can convert your raw data into MDS format parallelly.

  1. Download a raw data in parallel and convert to MDS format sequentially.

  2. Group a raw data and convert to MDS format parallely in separate sub-directories and then merge all the sub-directories index.json file to get a unified MDS dataset.

Let’s look at the small example of each one on how to that.

1. Fetch raw data in parallel and write sequentially#

For a large individual dataset file such as image or a video, it would be useful to download those files in parallel by multiple processes and once it is downloaded, call the MDSWriter to write the data into MDS format. Below is one such example on how to do that.

Setup#

Let’s start by making sure the right packages are installed and imported. We need to install the mosaicml-streaming package which installs the sufficient dependencies to run this tutorial.

[ ]:
%pip install mosaicml-streaming
[ ]:
import os
from multiprocessing import Pool

from streaming import MDSWriter, StreamingDataset

Global settings#

Initialize the global variable

[ ]:
out_root = './data'
# This could be a list of URLs needs to download
dataset = [i for i in range(25)]
columns = {'number': 'int'}

Download data from URL. Here, we just return a number for demonstration purpose.

[ ]:
def get_data(number):
    print(f'\nWorker PID: {os.getpid()}\tnumber: {number}', flush=True, end='')
    # Add code here to downloads the data from URL.
    return {'number': number}

Initialization method for each worker process which prints the worker PID.

[ ]:
# Initialize the worker process
def init_worker():
    # Get the pid for the current worker process
    pid = os.getpid()
    print(f'\nInitialize Worker PID: {pid}', flush=True, end='')

Convert to MDS format#

Initialize 4 worker processes which downloads the data in parallel and once the data is ready, it is getting written in MDS format using write method call.

[ ]:
# clean up root directory
%rm -rf $out_root

with Pool(initializer=init_worker, processes=4) as pool:
    with MDSWriter(out=out_root, columns=columns) as out:
        for sample in pool.imap(get_data, dataset):
            out.write(sample)

Load MDS dataset#

Read the sample using StreamingDataset which prints the sample ID.

[ ]:
# read the sample
dataset = StreamingDataset(local=out_root,
                           remote=None,
                           shuffle=False,)
for sample in dataset:
    print(sample['number'])
[ ]:
# Clean up
%rm -rf $out_root

2. Group the raw data and convert to MDS format in parallel#

For a large dataset file such as a tar file, zip file, or any other file, we would recommend to map one raw data file to one MDS sub-directories so that the dataset conversion happens by multiple process in parallel.

Import dependencies

[ ]:
import os
import json
from glob import glob
from typing import Iterator, Tuple

from multiprocessing import Pool

from streaming import MDSWriter, StreamingDataset

Global settings#

Initialize the global variable

[ ]:
out_root = './group_data'
num_groups = 4
num_process = 2

Get a sub-directory MDS path and raw dataset sample range of 10. For example, first sub-directory yields a sample from 0 to 9, second sub-directory yields a sample from 10 to 19, and so on.

If you are working with a large file, you can also yield a raw dataset file path instead of sample range.

[ ]:
def each_task(out_root: str, groups: int) -> Iterator[Tuple[str, int, int]]:
    """Get the sub-directory path and the sample range.

    Args:
        out_root (str): base output mds directory
        groups (int): Number of sub-directories to create

    Yields:
        Iterator[Tuple[str, int, int]]: Each argument tuple
    """
    for data_group in range(groups):
        sub_out_root = os.path.join(out_root, str(data_group))
        start_sample_idx = data_group * 10
        end_sample_idx = start_sample_idx + 9
        yield sub_out_root, start_sample_idx, end_sample_idx

Convert a raw dataset into MDS format.

[ ]:
def convert_to_mds(args: Iterator[Tuple[str, int, int]]) -> None:
    """Convert raw dataset into MDS format

    Args:
        args (Iterator[Tuple[str, int, int]]): All arguments, packed into a tuple because
            process pools only pass one argument.

    Yields:
        Dict: A sample
    """
    sub_out_root, start_sample_idx, end_sample_idx = args

    def get_data(start: int, end: int):
        for i in range(start, end + 1):
            yield {'number': i}

    columns = {'number': 'int'}

    with MDSWriter(out=sub_out_root,
                   columns=columns) as out:
        for sample in get_data(start_sample_idx, end_sample_idx):
            out.write(sample)

Divide the dataset into 4 sub-groups, each process takes a sub-group and converts a data into MDS format in their respective sub-directories.

[ ]:
# clean up root directory
%rm -rf $out_root

arg_tuples = each_task(out_root, groups=num_groups)

# Process group of data in parallel into directories of shards.
with Pool(initializer=init_worker, processes=num_process) as pool:
    for count in pool.imap(convert_to_mds, arg_tuples):
        pass
print('Finished')

Once dataset has been converted to an MDS format, let’s look at the directory structure. You will find 4 sub-directories and each sub-directories contain a index.json file and a shard files.

[ ]:
%ll $out_root

Merge meta data#

The last step of the conversion process is to merge all the sub-directories index.json file. The content of the Shard files will remain as it is. By calling the merge_index utility function, the global shard information will be written to a new index.json file placed in out.

[ ]:
from streaming.base.util import merge_index
merge_index(out_root, keep_local=True)

Let’s checkout the root directories where you can see one index.json file and many shard files.

[ ]:
%ll $out_root

Load MDS dataset#

Read the sample using StreamingDataset which prints the sample ID.

[ ]:
# read the sample
dataset = StreamingDataset(local=out_root,
                           remote=None,
                           shuffle=False)
for ix, sample in enumerate(dataset):
    print(sample['number'])

Cleanup#

[ ]:
%rm -rf $out_root

What next?#

You’ve now seen an in-depth tutorial on converting a dataset into MDS format using multiple process. If you are interested in the real world example, then, checkout the WebVid and Pile dataset conversion scripts which converts the dataset into MDS format via multiprocessing.

Come get involved with MosaicML!#

We’d love for you to get involved with the MosaicML community in any of these ways:

Star Streaming on GitHub#

Help make others aware of our work by starring Streaming on GitHub.

Join the MosaicML Slack#

Head on over to the MosaicML slack to join other ML efficiency enthusiasts. Come for the paper discussions, stay for the memes!

Contribute to Streaming#

Is there a bug you noticed or a feature you’d like? File an issue or make a pull request!